diff --git a/.gitignore b/.gitignore
index c3d3a1c0..1d2cdc69 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,14 @@
target/
.project
.classpath
+.cache
+.target/
+*~
+.#*
+.*.swp
+.DS_Store
+.codefellow
+.ensime*
+.eprj
+.history
+.idea
diff --git a/COPYING b/COPYING
new file mode 100644
index 00000000..0e259d42
--- /dev/null
+++ b/COPYING
@@ -0,0 +1,121 @@
+Creative Commons Legal Code
+
+CC0 1.0 Universal
+
+ CREATIVE COMMONS CORPORATION IS NOT A LAW FIRM AND DOES NOT PROVIDE
+ LEGAL SERVICES. DISTRIBUTION OF THIS DOCUMENT DOES NOT CREATE AN
+ ATTORNEY-CLIENT RELATIONSHIP. CREATIVE COMMONS PROVIDES THIS
+ INFORMATION ON AN "AS-IS" BASIS. CREATIVE COMMONS MAKES NO WARRANTIES
+ REGARDING THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS
+ PROVIDED HEREUNDER, AND DISCLAIMS LIABILITY FOR DAMAGES RESULTING FROM
+ THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS PROVIDED
+ HEREUNDER.
+
+Statement of Purpose
+
+The laws of most jurisdictions throughout the world automatically confer
+exclusive Copyright and Related Rights (defined below) upon the creator
+and subsequent owner(s) (each and all, an "owner") of an original work of
+authorship and/or a database (each, a "Work").
+
+Certain owners wish to permanently relinquish those rights to a Work for
+the purpose of contributing to a commons of creative, cultural and
+scientific works ("Commons") that the public can reliably and without fear
+of later claims of infringement build upon, modify, incorporate in other
+works, reuse and redistribute as freely as possible in any form whatsoever
+and for any purposes, including without limitation commercial purposes.
+These owners may contribute to the Commons to promote the ideal of a free
+culture and the further production of creative, cultural and scientific
+works, or to gain reputation or greater distribution for their Work in
+part through the use and efforts of others.
+
+For these and/or other purposes and motivations, and without any
+expectation of additional consideration or compensation, the person
+associating CC0 with a Work (the "Affirmer"), to the extent that he or she
+is an owner of Copyright and Related Rights in the Work, voluntarily
+elects to apply CC0 to the Work and publicly distribute the Work under its
+terms, with knowledge of his or her Copyright and Related Rights in the
+Work and the meaning and intended legal effect of CC0 on those rights.
+
+1. Copyright and Related Rights. A Work made available under CC0 may be
+protected by copyright and related or neighboring rights ("Copyright and
+Related Rights"). Copyright and Related Rights include, but are not
+limited to, the following:
+
+ i. the right to reproduce, adapt, distribute, perform, display,
+ communicate, and translate a Work;
+ ii. moral rights retained by the original author(s) and/or performer(s);
+iii. publicity and privacy rights pertaining to a person's image or
+ likeness depicted in a Work;
+ iv. rights protecting against unfair competition in regards to a Work,
+ subject to the limitations in paragraph 4(a), below;
+ v. rights protecting the extraction, dissemination, use and reuse of data
+ in a Work;
+ vi. database rights (such as those arising under Directive 96/9/EC of the
+ European Parliament and of the Council of 11 March 1996 on the legal
+ protection of databases, and under any national implementation
+ thereof, including any amended or successor version of such
+ directive); and
+vii. other similar, equivalent or corresponding rights throughout the
+ world based on applicable law or treaty, and any national
+ implementations thereof.
+
+2. Waiver. To the greatest extent permitted by, but not in contravention
+of, applicable law, Affirmer hereby overtly, fully, permanently,
+irrevocably and unconditionally waives, abandons, and surrenders all of
+Affirmer's Copyright and Related Rights and associated claims and causes
+of action, whether now known or unknown (including existing as well as
+future claims and causes of action), in the Work (i) in all territories
+worldwide, (ii) for the maximum duration provided by applicable law or
+treaty (including future time extensions), (iii) in any current or future
+medium and for any number of copies, and (iv) for any purpose whatsoever,
+including without limitation commercial, advertising or promotional
+purposes (the "Waiver"). Affirmer makes the Waiver for the benefit of each
+member of the public at large and to the detriment of Affirmer's heirs and
+successors, fully intending that such Waiver shall not be subject to
+revocation, rescission, cancellation, termination, or any other legal or
+equitable action to disrupt the quiet enjoyment of the Work by the public
+as contemplated by Affirmer's express Statement of Purpose.
+
+3. Public License Fallback. Should any part of the Waiver for any reason
+be judged legally invalid or ineffective under applicable law, then the
+Waiver shall be preserved to the maximum extent permitted taking into
+account Affirmer's express Statement of Purpose. In addition, to the
+extent the Waiver is so judged Affirmer hereby grants to each affected
+person a royalty-free, non transferable, non sublicensable, non exclusive,
+irrevocable and unconditional license to exercise Affirmer's Copyright and
+Related Rights in the Work (i) in all territories worldwide, (ii) for the
+maximum duration provided by applicable law or treaty (including future
+time extensions), (iii) in any current or future medium and for any number
+of copies, and (iv) for any purpose whatsoever, including without
+limitation commercial, advertising or promotional purposes (the
+"License"). The License shall be deemed effective as of the date CC0 was
+applied by Affirmer to the Work. Should any part of the License for any
+reason be judged legally invalid or ineffective under applicable law, such
+partial invalidity or ineffectiveness shall not invalidate the remainder
+of the License, and in such case Affirmer hereby affirms that he or she
+will not (i) exercise any of his or her remaining Copyright and Related
+Rights in the Work or (ii) assert any associated claims and causes of
+action with respect to the Work, in either case contrary to Affirmer's
+express Statement of Purpose.
+
+4. Limitations and Disclaimers.
+
+ a. No trademark or patent rights held by Affirmer are waived, abandoned,
+ surrendered, licensed or otherwise affected by this document.
+ b. Affirmer offers the Work as-is and makes no representations or
+ warranties of any kind concerning the Work, express, implied,
+ statutory or otherwise, including without limitation warranties of
+ title, merchantability, fitness for a particular purpose, non
+ infringement, or the absence of latent or other defects, accuracy, or
+ the present or absence of errors, whether or not discoverable, all to
+ the greatest extent permissible under applicable law.
+ c. Affirmer disclaims responsibility for clearing rights of other persons
+ that may apply to the Work or any use thereof, including without
+ limitation any person's Copyright and Related Rights in the Work.
+ Further, Affirmer disclaims responsibility for obtaining any necessary
+ consents, permissions or other rights required for any use of the
+ Work.
+ d. Affirmer understands and acknowledges that Creative Commons is not a
+ party to this document and has no duty or obligation with respect to
+ this CC0 or use of the Work.
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 00000000..696f2c0e
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,8 @@
+Licensed under Public Domain (CC0)
+
+To the extent possible under law, the person who associated CC0 with
+this code has waived all copyright and related or neighboring
+rights to this code.
+
+You should have received a copy of the CC0 legalcode along with this
+work. If not, see .
diff --git a/README.md b/README.md
new file mode 100644
index 00000000..b8e85fa5
--- /dev/null
+++ b/README.md
@@ -0,0 +1,109 @@
+# Reactive Streams #
+
+The purpose of Reactive Streams is to provide a standard for asynchronous stream processing with non-blocking backpressure.
+
+## Goals, Design and Scope ##
+
+Handling streams of data—especially “live” data whose volume is not predetermined—requires special care in an asynchronous system. The most prominent issue is that resource consumption needs to be carefully controlled such that a fast data source does not overwhelm the stream destination. Asynchrony is needed in order to enable the parallel use of computing resources, on collaborating network hosts or multiple CPU cores within a single machine.
+
+The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary—think passing elements on to another thread or thread-pool—while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, backpressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. The benefits of asynchronous processing would be negated if the communication of backpressure were synchronous (see also the [Reactive Manifesto](http://reactivemanifesto.org/)), therefore care has been taken to mandate fully non-blocking and asynchronous behavior of all aspects of a Reactive Streams implementation.
+
+It is the intention of this specification to allow the creation of many conforming implementations, which by virtue of abiding by the rules will be able to interoperate smoothly, preserving the aforementioned benefits and characteristics across the whole processing graph of a stream application.
+
+It should be noted that the precise nature of stream manipulations (transformation, splitting, merging, etc.) is not covered by this specification. Reactive Streams are only concerned with mediating the stream of data between different processing elements. In their development care has been taken to ensure that all basic ways of combining streams can be expressed.
+
+In summary, Reactive Streams is a standard and specification for Stream-oriented libraries for the JVM that
+
+ - process a potentially unbounded number of elements
+ - in sequence,
+ - asynchronously passing elements between components,
+ - with mandatory non-blocking backpressure.
+
+The Reactive Streams specification consists of the following parts:
+
+**The SPI** defines the interoperablility layer between different implementations.
+
+**The API** specifies the types that the users of Reactive Stream libraries use.
+
+***The Technology Compatibility Kit (TCK)*** is a standard test suite for conformance testing of implementations.
+
+Implementations are free to implement additional features not covered by the specification as long as they conform to the API and SPI requirements and pass the tests in the TCK.
+
+#### Comparison with related technologies ####
+
+In contrast to reactive streams described in this document, a Future represents exactly one element (or a failure) that is produced asynchronosly while streams can provide a potentially unbounded number of elements.
+
+Compared to Rx, the SPI described here prescribes a mandatory, non-blocking way to handle back-pressure and requires the processing of an element by a dowstream component to be dispatched asynchronously.
+
+Iteratees are an abstraction used for consuming a stream, often for parsing it. In this sense they are not a stream transformation or combination tool in themselves.
+
+### SPI Components ###
+
+The SPI consists of components that are required to be provided by Reactive Stream implementations but these interfaces should not be exposed to libraries or user code that *use* a Reactive Streams implementation. The reason for this is that the methods used on the SPI level have very strict and rather complex semantic requirements which are likely to be violated by end users.
+
+The components of the SPI are:
+
+ - Publisher
+ - Subscriber
+ - Subscription
+
+A *Publisher* is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s). A Publisher can serve multiple subscribers subscribed dynamically at various points in time. In the case of multiple subscribers the Publisher should respect the processing rates of all of its subscribers (possibly allowing for a bounded drift between them). It must eventually clean up its resources after all of its subscribers have been unsubscribed and shut down. A Publisher will typically support fanning out to multiple Subscribers in order to support the dynamic assembly of processing networks from building blocks that can freely be shared.
+
+A *Subscriber* is a component that accepts a sequenced stream of elements provided by a Publisher. At any given time a Subscriber might be subscribed to at most one Publisher. It provides the callback onNext to be called by the upstream Producer, accepting an element that is to be asynchronously processed or enqueued without blocking the Producer.
+
+A Subscriber communicates demand to the Publisher via a *Subscription* which is passed to the Subscriber after the subscription has been established. The Subscription exposes the requestMore(int) method that is used by the Subscriber to signal demand to the Publisher. For each of its subscribers the Publisher obeys the following invariant:
+
+*If N is the total number of demand tokens handed to the Publisher P by a Subscriber S during the time period up to a time T, then the number of onNext calls that are allowed to be performed by P on S before T must be less than or equal to N. The number of pending demand tokens must be tracked by the Producer separately for each of its subscribers.*
+
+Subscribers that do not currently have an active subscription may subscribe to a Publisher. The only guarantee for subscribers attached at different points in time is that they all observe a common suffix of the stream, i.e. they receive the same elements after a certain point in time but it is not guaranteed that they see exactly the same number of elements. This obviously only holds if the subscriber does not cancel its subscription before the stream has been terminated.
+
+> In practice there is a difference between the guarantees that different publishers can provide for subscribers attached at different points in time. For example Publishers serving elements from a strict collection (“cold”) might guarantee that all subscribers see *exactly* the same elements (unless unsubscribed before completion) since they can replay the elements from the collection at any point in time. Other publishers might represent an ephemeral source of elements (e.g. a “hot” TCP stream) and keep only a limited output buffer to replay for future subscribers.
+
+At any time the Publisher may signal that it is not able to provide more elements. This is done by invoking onComplete on its subscribers.
+
+> For example a Publisher representing a strict collection signals completion to its subscriber after it provided all the elements. Now a later subscriber might still receive the whole collection before receiving onComplete.
+
+### API components ###
+
+The purpose of the API is to provide the types that users interact with directly. SPI methods and interfaces should not be exposed expect for the purpose of writing Reactive Streams implementations.
+
+The API counterpart for Publisher is *Producer* and for Subscriber is *Consumer*. The combination of these two—a stream processing element with asynchronous input and output—is called *Processor*.
+
+### Asynchronous processing ###
+
+The Reactive Streams SPI prescribes that all processing of elements (onNext) or termination signals (onError, onComplete) happens outside of the execution stack of the Publisher. This is achieved by scheduling the processing to run asynchronously, possibly on a different thread. The Subscriber should make sure to minimize the amount of processing steps used to initiate this process, meaning that all its SPI-mandated methods shall return as quickly as possible.
+
+In contrast to communicating back-pressure by blocking the publisher, a non-blocking solution needs to communicate demand through a dedicated control channel. This channel is provided by the Subscription: the subscriber controls the maximum amount of future elements it is willing receive by sending explicit demand tokens (by calling requestMore(int)).
+
+#### Relationship to synchronous stream-processing ####
+
+This document describes asynchronous, non-blocking backpressure boundaries but in between those boundaries any kind of synchronous stream processing model is permitted. This is useful for performance optimization (eliminating inter-thread synchronization) and it conveniently transports backpressure implicitly (the calling method cannot continue while the call lasts). As an example consider a section consisting of three connected Processors, A, B and C:
+
+ (...) --> A[S1 --> S2] --> B[S3 --> S4 --> S5] --> C[S6] --> (...)
+
+Processor B is implemented in terms of three synchronous steps S3, S4 and S5. When communicating with its upstream Producer A, or its downstream Subscriber C it obeys the asynchronous, back-pressure aware requirements of the SPI, but internally it drives the synchronous stream of S3, S4, S5.
+
+> Please note that processing usually happens pipelined between A, B and C: assuming a stream of elements (E1, E2, E3) A might start processing E2 while C still processes E1. On the other hand inside A execution can be completely synchronous, so E3 might be only processed by S1 until E2 has left S2.
+
+### Subscriber controlled queue bounds ###
+
+One of the underlying design principles is that all buffer sizes are to be bounded and these bounds must be *known* and *controlled* by the subscribers. These bounds are expressed in terms of *element count* (which in turn translates to the invocation count of onNext). Any implementation that aims to support infinite streams (especially high output rate streams) needs to enforce bounds all along the way to avoid out-of-memory errors and constrain resource usage in general.
+
+Since back-pressure is mandatory the use of unbounded buffers can be avoided. In general, the only time when a queue might grow without bounds is when the publisher side maintains a higher rate than the subscriber for an extended period of time, but this scenario is handled by backpressure instead.
+
+Queue bounds can be controlled by a subscriber by signaling demand for the appropriate number of elements. At any point in time the subscriber knows:
+
+ - the total number of elements requested: `P`
+ - the number of elements that have been processed: `N`
+
+Then the maximum number of elements that may arrive—until more demand is signaled to the Publisher—is `P - N`. In the case that the subscriber also knows the number of elements B in its input buffer then this bound can be refined to `P - B - N`.
+
+These bounds must be respected by a publisher independent of whether the source it represents can be backpressured or not. In the case of sources whose production rate cannot be influenced—for example clock ticks or mouse movement—the publisher must choose to either buffer or drop elements to obey the imposed bounds.
+
+Subscribers signaling a demand for one element after the reception of an element effectively implement a Stop-and-Wait protocol where the demand signal is equivalent to acknowledgement. By providing demand for multiple elements the cost of acknowledgement is amortized. It is worth noting that the subscriber is allowed to signal demand at any point in time, allowing it to avoid unnecessary delays between the publisher and the subscriber (i.e. keeping its input buffer filled without having to wait for full round-trips).
+
+> Systems that use a signal to notify the publisher to suspend publishing cannot guarantee bounded queues. Since there is a delay between the time at which the signal has been raised and when it is processed, there is a window of time during which an arbitrary number of elements can be passed to the subscriber.
+
+## Legal
+
+This project is a collaboration between Netflix, Twitter, RedHat, Pivotal and Typesafe. The code is offered to the Public Domain in order to allow free use by interested parties who want to create compatible implementations. For details see `COPYING`.
diff --git a/build.sbt b/build.sbt
new file mode 100644
index 00000000..2826cd7f
--- /dev/null
+++ b/build.sbt
@@ -0,0 +1,13 @@
+organization := "org.asyncrx"
+
+version := "0.1-SNAPSHOT"
+
+licenses := Seq("CC0" -> url("http://creativecommons.org/publicdomain/zero/1.0/"))
+
+homepage := Some(url("https://groups.google.com/forum/?hl=en#!forum/reactive-streams"))
+
+scalaVersion := "2.10.3"
+
+lazy val spi = project
+
+lazy val tck = project.dependsOn(spi)
diff --git a/project/build.properties b/project/build.properties
new file mode 100644
index 00000000..37b489cb
--- /dev/null
+++ b/project/build.properties
@@ -0,0 +1 @@
+sbt.version=0.13.1
diff --git a/spi/build.sbt b/spi/build.sbt
new file mode 100644
index 00000000..b6bbbd00
--- /dev/null
+++ b/spi/build.sbt
@@ -0,0 +1,3 @@
+organization := "asyncrx"
+
+name := "reactive-streams-spi"
diff --git a/spi/src/main/scala/asyncrx/api/Processor.scala b/spi/src/main/scala/asyncrx/api/Processor.scala
new file mode 100644
index 00000000..68b093ef
--- /dev/null
+++ b/spi/src/main/scala/asyncrx/api/Processor.scala
@@ -0,0 +1,9 @@
+package asyncrx.api
+
+/**
+ * A Processor is a stand-alone representation of a transformation for
+ * elements from In to Out types. Implementations of this API will provide
+ * factory methods for creating Processors and connecting them to
+ * [[Producer]] and [[Consumer]].
+ */
+trait Processor[In, Out] extends Consumer[In] with Producer[Out]
diff --git a/spi/src/main/scala/asyncrx/api/Producer.scala b/spi/src/main/scala/asyncrx/api/Producer.scala
new file mode 100644
index 00000000..6f9b95e7
--- /dev/null
+++ b/spi/src/main/scala/asyncrx/api/Producer.scala
@@ -0,0 +1,37 @@
+package asyncrx
+package api
+
+/**
+ * A Producer is the logical source of elements of a given type.
+ * The underlying implementation is done by way of a [[asyncrx.spi.Publisher]].
+ * This interface is the user-level API for a source while a Publisher is the
+ * SPI.
+ *
+ * Implementations of this interface will typically offer domain- or language-specific
+ * methods for transforming or otherwise interacting with the produced stream of elements.
+ */
+trait Producer[T] {
+
+ /**
+ * Get the underlying Publisher for this Producer. This method should only be used by
+ * implementations of this API.
+ */
+ def getPublisher: spi.Publisher[T]
+}
+
+/**
+ * A Consumer is the logical sink of elements of a given type.
+ * The underlying implementation is done by way of a [[asyncrx.spi.Subscriber]].
+ * This interface is the user-level API for a sink while a Subscriber is the SPI.
+ *
+ * Implementations of this interface will typically offer domain- or language-specific
+ * methods for transforming or otherwise interacting with the stream of elements.
+ */
+trait Consumer[T] {
+
+ /**
+ * Get the underlying Subscriber for this Consumer. This method should only be used by
+ * implementations of this API.
+ */
+ def getSubscriber: spi.Subscriber[T]
+}
diff --git a/spi/src/main/scala/asyncrx/spi/Publisher.scala b/spi/src/main/scala/asyncrx/spi/Publisher.scala
new file mode 100644
index 00000000..c29b06f8
--- /dev/null
+++ b/spi/src/main/scala/asyncrx/spi/Publisher.scala
@@ -0,0 +1,80 @@
+package asyncrx.spi
+
+/**
+ * A Subscription models the relationship between a [[Publisher]] and a [[Subscriber]].
+ * The Subscriber receives a Subscription so that it can ask for elements to be delivered
+ * using [[Subscription#requestMore]]. The Subscription can be disposed of by canceling it.
+ */
+trait Subscription {
+
+ /**
+ * Cancel this subscription. The [[Publisher]] to which produced this Subscription
+ * will eventually stop sending more elements to the [[Subscriber]] which owns
+ * this Subscription. This may happen before the requested number of elements has
+ * been delivered, even if the Publisher would still have more elements.
+ */
+ def cancel(): Unit
+
+ /**
+ * Request more data from the [[Publisher]] which produced this Subscription.
+ * The number of requested elements is cumulative to the number requested previously.
+ * The Publisher may eventually publish up to the requested number of elements to
+ * the [[Subscriber]] which owns this Subscription.
+ */
+ def requestMore(elements: Int): Unit
+}
+
+/**
+ * A Publisher is a source of elements of a given type. One or more [[Subscriber]] may be connected
+ * to this Publisher in order to receive the published elements, contingent on availability of these
+ * elements as well as the presence of demand signaled by the Subscriber via [[Subscription#requestMore]].
+ */
+trait Publisher[T] {
+
+ /**
+ * Subscribe the given [[Subscriber]] to this Publisher. A Subscriber can at most be subscribed once
+ * to a given Publisher, and to at most one Publisher in total.
+ */
+ def subscribe(subscriber: Subscriber[T]): Unit
+}
+
+/**
+ * A Subscriber receives elements from a [[Publisher]] based on the [[Subscription]] it has.
+ * The Publisher may supply elements as they become available, the Subscriber signals demand via
+ * [[Subscription#requestMore]] and elements from when supply and demand are both present.
+ */
+trait Subscriber[T] {
+
+ /**
+ * The [[Publisher]] generates a [[Subscription]] upon [[Publisher#subscribe]] and passes
+ * it on to the Subscriber named there using this method. The Publisher may choose to reject
+ * the subscription request by calling [[#onError]] instead.
+ */
+ def onSubscribe(subscription: Subscription): Unit
+
+ /**
+ * The [[Publisher]] calls this method to pass one element to this Subscriber. The element
+ * must not be null. The Publisher must not call this method more often than
+ * the Subscriber has signaled demand for via the corresponding [[Subscription]].
+ */
+ def onNext(element: T): Unit
+
+ /**
+ * The [[Publisher]] calls this method in order to signal that it terminated normally.
+ * No more elements will be forthcoming and none of the Subscriber’s methods will be
+ * called hereafter.
+ */
+ def onComplete(): Unit
+
+ /**
+ * The [[Publisher]] calls this method to signal that the stream of elements has failed
+ * and is being aborted. The Subscriber should abort its processing as soon as possible.
+ * No more elements will be forthcoming and none of the Subscriber’s methods will be
+ * called hereafter.
+ *
+ * This method is not intended to pass validation errors or similar from Publisher to Subscriber
+ * in order to initiate an orderly shutdown of the exchange; it is intended only for fatal
+ * failure conditions which make it impossible to continue processing further elements.
+ */
+ def onError(cause: Throwable): Unit
+}
diff --git a/tck/build.sbt b/tck/build.sbt
new file mode 100644
index 00000000..d74008ce
--- /dev/null
+++ b/tck/build.sbt
@@ -0,0 +1,5 @@
+organization := "asyncrx"
+
+name := "reactive-streams-tck"
+
+libraryDependencies += "org.testng" % "testng" % "5.14.10"
diff --git a/tck/src/main/resources/spec.md b/tck/src/main/resources/spec.md
new file mode 100644
index 00000000..b15dcd46
--- /dev/null
+++ b/tck/src/main/resources/spec.md
@@ -0,0 +1,148 @@
+## Reactive-Streams SPI Specification
+
+Clarification of terminology used throughout this document:
+
+* "Publisher": an implementation of the `rx.async.spi.Publisher` interface
+* "Subscriber": an implementation of the `rx.async.spi.Subscriber` interface
+* "Subscription": an implementation of the `rx.async.spi.Subscription` interface
+* "Processor": an implementation of the `rx.async.api.Processor` interface
+* "subscriber": a Subscriber which is currently subscribed, i.e. has an active Subscription
+* Foo::bar: an instance method `bar` on the class `Foo`
+
+
+## Specification Rules
+
+### Publisher::subscribe(Subscriber)
+
+* when Publisher is in `completed` state
+ * must not call `onSubscribe` on the given Subscriber
+ * must trigger a call to `onComplete` on the given Subscriber
+
+* when Publisher is in `error` state
+ * must not call `onSubscribe` on the given Subscriber
+ * must trigger a call to `onError` on the given Subscriber
+
+* when Publisher is in `shut-down` state
+ * must not call `onSubscribe` on the given Subscriber
+ * must trigger a call to `onError` with a `java.lang.IllegalStateException` on the given
+ Subscriber
+
+* when Publisher is neither in `completed`, `error` nor `shut-down` state
+ * must trigger a call to `onSubscribe` on the given Subscriber if the Subscription is to be accepted
+ * must trigger a call to `onError` on the given Subscriber if the Subscription is to be rejected
+ * must reject the Subscription with a `java.lang.IllegalStateException` if the same Subscriber already
+ has an active Subscription [1]
+
+
+### Subscription::requestMore(Int)
+
+* when Subscription is cancelled
+ * must ignore the call
+* when Subscription is not cancelled
+ * must register the given number of additional elements to be produced to the respective subscriber
+ * must throw a `java.lang.IllegalArgumentException` if the argument is <= 0
+ * is allowed to synchronously call `onNext` on this (or other) subscriber(s) if and only if the
+ next element is already available
+ * is allowed to synchronously call `onComplete` or `onError` on this (or other) subscriber(s)
+
+
+### Subscription::cancel
+
+* when Subscription is cancelled
+ * must ignore the call
+* when Subscription is not cancelled
+ * the Publisher must eventually cease to call any methods on the corresponding subscriber
+ * the Publisher must eventually drop any references to the corresponding subscriber
+ * the Publisher must obey the "a Subscription::cancel happens-before any subsequent
+ Publisher::subscribe" rule [2]
+ * the Publisher must shut itself down if the given Subscription is the last downstream Subscription [3]
+
+
+### A Publisher
+
+* must not call `onNext`
+ * more times than the total number of elements that was previously requested with
+ Subscription::requestMore by the corresponding subscriber
+ * after having issued an `onComplete` or `onError` call on a subscriber
+
+* must produce the same elements in the same sequence for all its subscribers [4]
+* must support a pending element count up to 2^63-1 (Long.MAX_VALUE) and provide for overflow protection
+* must call `onComplete` on a subscriber after having produced the final stream element to it [5]
+* must call `onComplete` on a subscriber at the earliest possible point in time [6]
+* must start producing with the oldest element still available for a new subscriber
+* must call `onError` on all its subscribers if it encounters a non-recoverable error
+* must not call `onComplete` or `onError` more than once per subscriber
+
+
+### Subscriber::onSubscribe(Subscription), Subscriber::onNext(T)
+
+* must asynchronously schedule a respective event to the subscriber
+* must not call any methods on the Subscription, the Publisher or any other Publishers or Subscribers
+
+
+### Subscriber::onComplete, Subscriber::onError(Throwable)
+
+* must asynchronously schedule a respective event to the Subscriber
+* must not call any methods on the Subscription, the Publisher or any other Publishers or Subscribers
+* must consider the Subscription cancelled after having received the event
+
+
+### A Subscriber
+
+* must not accept an `onSubscribe` event if it already has an active Subscription [7]
+* must call Subscription::cancel during shutdown if it still has an active Subscription
+* must ensure that all calls on a Subscription take place from the same thread or provide for respective
+ external synchronization [8]
+* must be prepared to receive one or more `onNext` events after having called Subscription::cancel [9]
+* must be prepared to receive an `onComplete` event with or without a preceding Subscription::requestMore call
+* must be prepared to receive an `onError` event with or without a preceding Subscription::requestMore call
+* must make sure that all calls on its `onXXX` methods happen-before the processing of the respective
+ events [10]
+
+
+### A Processor
+
+* must obey all Publisher rules on its producing side
+* must obey all Subscriber rules on its consuming side
+* must cancel its upstream Subscription if its last downstream Subscription has been cancelled [11]
+* must immediately pass on `onError` events received from its upstream to its downstream [12]
+* must be prepared to receive incoming elements from its upstream even if a downstream subscriber has not
+ requested anything yet
+
+### Generally
+
+* all SPI methods should neither block nor run expensive logic on the calling thread [13]
+
+### Footnotes
+
+1. Reference equality is to be used for establishing whether two Subscribers are the "same".
+
+2. I.e. when seen from the perspective on one thread a `subscribe(...)` on a Publisher must not "overtake"
+ a `cancel()` on a Subscription from that Publisher. Without this happens-before rule cancelling a Subscription and
+ immediately resubscribing to a Publisher might fail as subscribing the same Subscriber twice is disallowed.
+
+3. Explicitly adding "keep-alive" Subscribers can prevent automatic shutdown if required.
+
+4. Producing the stream elements at (temporarily) differing rates to different subscribers is allowed.
+
+5. The final stream element must be the same for all Subscribers.
+
+6. In particular a Publisher should not wait for another Subscription::requestMore call before calling `onComplete`
+ if the information that no more elements will follow is already available before.
+
+7. I.e. one Subscriber cannot be subscribed to multiple Publishers at the same time.
+ What exactly "not accepting" means is left to the implementation but should include behavior that makes the user
+ aware of the usage error (e.g. by logging, throwing an exception or similar).
+
+8. The Subscription implementation is not required to be thread-safe.
+
+9. if there are still requested elements pending
+
+10. I.e. the Subscriber must take care of properly publishing the event to its processing logic.
+ ("happen-before" is used here as defined by the JLS chapter 17.4.5)
+
+11. In addition to shutting itself down as required by the Publisher rules.
+
+12. I.e. errors must not be treated as "in-stream" events that are allowed to be buffered.
+
+13. I.e. they are supposed to return control to the caller quickly.
diff --git a/tck/src/main/scala/asyncrx/tck/IdentityProcessorVerification.scala b/tck/src/main/scala/asyncrx/tck/IdentityProcessorVerification.scala
new file mode 100644
index 00000000..1cdb97ee
--- /dev/null
+++ b/tck/src/main/scala/asyncrx/tck/IdentityProcessorVerification.scala
@@ -0,0 +1,233 @@
+package asyncrx.tck
+
+import asyncrx.spi._
+import asyncrx.api._
+import org.testng.annotations.Test
+
+abstract class IdentityProcessorVerification[T] extends PublisherVerification[T] with SubscriberVerification[T] { verification ⇒
+
+ // TODO: make the timeouts be dilate-able so that one can tune the suite for the machine it runs on
+
+ /**
+ * This is the main method you must implement in your test incarnation.
+ * It must create a Processor, which simply forwards all stream elements from its upstream
+ * to its downstream. It must be able to internally buffer the given number of elements.
+ */
+ def createIdentityProcessor(bufferSize: Int): Processor[T, T]
+
+ /**
+ * Helper method required for running the Publisher rules against a Processor.
+ * It must create a Publisher for a stream with exactly the given number of elements.
+ * If `elements` is zero the produced stream must be infinite.
+ * The stream must not produce the same element twice (in case of an infinite stream this requirement
+ * is relaxed to only apply to the elements that are actually requested during all tests).
+ */
+ def createHelperPublisher(elements: Int): Publisher[T]
+
+ ////////////////////// PUBLISHER RULES VERIFICATION ///////////////////////////
+
+ // A Processor
+ // must obey all Publisher rules on its producing side
+ def createPublisher(elements: Int): Publisher[T] = {
+ val processor = createIdentityProcessor(bufferSize = 16)
+ val pub = createHelperPublisher(elements)
+ pub.subscribe(processor.getSubscriber)
+ processor.getPublisher // we run the PublisherVerification against this
+ }
+
+ // A Publisher
+ // must support a pending element count up to 2^63-1 (Long.MAX_VALUE) and provide for overflow protection
+ @Test
+ override def mustSupportAPendingElementCountUpToLongMaxValue(): Unit =
+ new TestSetup(bufferSize = 1) {
+ val sub = newSubscriber()
+ sub.requestMore(Int.MaxValue)
+ sub.requestMore(Int.MaxValue)
+ sub.requestMore(2) // if the Subscription only keeps an Int counter without overflow protection it will now be at zero
+
+ val x = nextT()
+ sendNext(x)
+ expectNextElement(sub, x)
+
+ val y = nextT()
+ sendNext(y)
+ expectNextElement(sub, y)
+ }
+
+ // A Publisher
+ // must call `onError` on all its subscribers if it encounters a non-recoverable error
+ @Test
+ override def mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError(): Unit =
+ new TestSetup(bufferSize = 1) {
+ val sub1 = new ManualSubscriber[T] with SubscriptionSupport[T] with ErrorCollection[T]
+ verification.subscribe(processor.getPublisher, sub1)
+ val sub2 = new ManualSubscriber[T] with SubscriptionSupport[T] with ErrorCollection[T]
+ verification.subscribe(processor.getPublisher, sub2)
+
+ sub1.requestMore(1)
+ expectRequestMore(1)
+ val x = nextT()
+ sendNext(x)
+ expectNextElement(sub1, x)
+ sub1.requestMore(1)
+
+ // sub1 now has received and element and has 1 pending
+ // sub2 has not yet requested anything
+
+ val ex = new RuntimeException
+ sendError(ex)
+ sub1.expectError(ex)
+ sub2.expectError(ex)
+
+ verifyNoAsyncErrors()
+ }
+
+ ////////////////////// SUBSCRIBER RULES VERIFICATION ///////////////////////////
+
+ // A Processor
+ // must obey all Subscriber rules on its consuming side
+ def createSubscriber(probe: SubscriberVerification.SubscriberProbe[T]): Subscriber[T] = {
+ val processor = createIdentityProcessor(bufferSize = 2)
+ processor.getPublisher.subscribe {
+ new Subscriber[T] {
+ def onSubscribe(subscription: Subscription): Unit =
+ probe.registerOnSubscribe {
+ new SubscriberVerification.SubscriberPuppet {
+ def triggerShutdown(): Unit = subscription.cancel() // TODO: improve
+ def triggerRequestMore(elements: Int): Unit = subscription.requestMore(elements)
+ def triggerCancel(): Unit = subscription.cancel()
+ }
+ }
+ def onNext(element: T): Unit = probe.registerOnNext(element)
+ def onComplete(): Unit = probe.registerOnComplete()
+ def onError(cause: Throwable): Unit = probe.registerOnError(cause)
+ }
+ }
+ processor.getSubscriber // we run the SubscriberVerification against this
+ }
+
+ ////////////////////// OTHER SPEC RULE VERIFICATION ///////////////////////////
+
+ // A Processor
+ // must cancel its upstream Subscription if its last downstream Subscription has been cancelled
+ @Test
+ def mustCancelItsUpstreamSubscriptionIfItsLastDownstreamSubscriptionHasBeenCancelled(): Unit =
+ new TestSetup(bufferSize = 1) {
+ val sub = newSubscriber()
+ sub.cancel()
+ expectCancelling()
+
+ verifyNoAsyncErrors()
+ }
+
+ // A Processor
+ // must immediately pass on `onError` events received from its upstream to its downstream
+ @Test
+ def mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream(): Unit =
+ new TestSetup(bufferSize = 1) {
+ val sub = new ManualSubscriber[T] with SubscriptionSupport[T] with ErrorCollection[T]
+ verification.subscribe(processor.getPublisher, sub)
+
+ val ex = new RuntimeException
+ sendError(ex)
+ sub.expectError(ex) // "immediately", i.e. without a preceding requestMore
+
+ verifyNoAsyncErrors()
+ }
+
+ // A Processor
+ // must be prepared to receive incoming elements from its upstream even if a downstream subscriber has not requested anything yet
+ @Test
+ def mustBePreparedToReceiveIncomingElementsFromItsUpstreamEvenIfADownstreamSubscriberHasNotRequestedYet(): Unit =
+ new TestSetup(bufferSize = 2) {
+ val sub = newSubscriber()
+ val x = nextT()
+ sendNext(x)
+ sub.expectNone(withinMillis = 50)
+ val y = nextT()
+ sendNext(y)
+ sub.expectNone(withinMillis = 50)
+
+ sub.requestMore(2)
+ sub.expectNext(x)
+ sub.expectNext(y)
+
+ verifyNoAsyncErrors()
+ }
+
+ /////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////
+
+ @Test // trigger `requestFromUpstream` for elements that have been requested 'long ago'
+ def mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo(): Unit =
+ new TestSetup(bufferSize = 1) {
+ val sub1 = newSubscriber()
+ sub1.requestMore(5)
+
+ expectRequestMore(1)
+ val x = nextT()
+ sendNext(x)
+ expectNextElement(sub1, x)
+
+ expectRequestMore(1)
+ val y = nextT()
+ sendNext(y)
+ expectNextElement(sub1, y)
+
+ expectRequestMore(1)
+
+ val sub2 = newSubscriber()
+
+ // sub1 now has 3 pending
+ // sub2 has 0 pending
+
+ val z = nextT()
+ sendNext(z)
+ expectNextElement(sub1, z)
+ sub2.expectNone() // since sub2 hasn't requested anything yet
+
+ sub2.requestMore(1)
+ expectNextElement(sub2, z)
+
+ expectRequestMore(1) // because sub1 still has 2 pending
+
+ verifyNoAsyncErrors()
+ }
+
+ @Test // unblock the stream if a 'blocking' subscription has been cancelled
+ def mustUnblockTheStreamIfABlockingSubscriptionHasBeenCancelled(): Unit =
+ new TestSetup(bufferSize = 1) {
+ val sub1 = newSubscriber()
+ val sub2 = newSubscriber()
+
+ sub1.requestMore(5)
+ expectRequestMore(1)
+ sendNext(nextT())
+
+ expectNoRequestMore() // because we only have buffer size 1 and sub2 hasn't seen the first value yet
+ sub2.cancel() // must "unblock"
+ expectRequestMore(1)
+
+ verifyNoAsyncErrors()
+ }
+
+ /////////////////////// TEST INFRASTRUCTURE //////////////////////
+
+ abstract class TestSetup(bufferSize: Int) extends ManualPublisher[T] {
+ private val tees = newManualSubscriber(createHelperPublisher(0)) // gives us access to an infinite stream of T values
+ private var seenTees = Set.empty[T]
+ val processor = createIdentityProcessor(bufferSize)
+ subscribe(processor.getSubscriber)
+
+ def newSubscriber() = newManualSubscriber(processor.getPublisher)
+ def nextT(): T = {
+ val t = tees.requestNextElement()
+ if (seenTees.contains(t)) flop(s"Helper publisher illegally produced the same element $t twice")
+ seenTees += t
+ t
+ }
+ def expectNextElement(sub: ManualSubscriber[T], expected: T): Unit = {
+ val elem = sub.nextElement()
+ if (elem != expected) flop(s"Received `onNext($elem)` on downstream but expected `onNext($expected)`")
+ }
+ }
+}
\ No newline at end of file
diff --git a/tck/src/main/scala/asyncrx/tck/PublisherVerification.scala b/tck/src/main/scala/asyncrx/tck/PublisherVerification.scala
new file mode 100644
index 00000000..bdc5be99
--- /dev/null
+++ b/tck/src/main/scala/asyncrx/tck/PublisherVerification.scala
@@ -0,0 +1,390 @@
+package asyncrx.tck
+
+import java.lang.ref.{ Reference, WeakReference, ReferenceQueue }
+import org.testng.Assert._
+import org.testng.annotations.Test
+
+import asyncrx.spi.{ Publisher, Subscription }
+
+trait PublisherVerification[T] extends TestEnvironment {
+
+ // TODO: make the timeouts be dilate-able so that one can tune the suite for the machine it runs on
+
+ /**
+ * This is the main method you must implement in your test incarnation.
+ * It must create a Publisher for a stream with exactly the given number of elements.
+ * If `elements` is zero the produced stream must be infinite.
+ */
+ def createPublisher(elements: Int): Publisher[T]
+
+ /**
+ * Override in your test if you want to enable completed-state verification.
+ * If you don't override the respective tests will be ignored.
+ */
+ def createCompletedStatePublisher(): Publisher[T] = null
+
+ /**
+ * Override in your test if you want to enable error-state verification.
+ * If you don't override the respective tests will be ignored.
+ */
+ def createErrorStatePublisher(): Publisher[T] = null
+
+ ////////////////////// TEST SETUP VERIFICATION ///////////////////////////
+
+ @Test
+ def createPublisher3MustProduceAStreamOfExactly3Elements(): Unit =
+ activePublisherTest(elements = 3) { pub ⇒
+ val sub = newManualSubscriber(pub)
+ def requestNextElementOrEndOfStream() =
+ sub.requestNextElementOrEndOfStream(errorMsg = s"Timeout while waiting for next element from Publisher $pub")
+ assertTrue(requestNextElementOrEndOfStream().isDefined, s"Publisher $pub produced no elements")
+ assertTrue(requestNextElementOrEndOfStream().isDefined, s"Publisher $pub produced only 1 element")
+ assertTrue(requestNextElementOrEndOfStream().isDefined, s"Publisher $pub produced only 2 elements")
+ sub.expectCompletion()
+ }
+
+ ////////////////////// SPEC RULE VERIFICATION ///////////////////////////
+
+ // Publisher::subscribe(Subscriber)
+ // when Publisher is in `completed` state
+ // must not call `onSubscribe` on the given Subscriber
+ // must trigger a call to `onComplete` on the given Subscriber
+ @Test
+ def publisherSubscribeWhenCompletedMustTriggerOnCompleteAndNotOnSubscribe(): Unit =
+ completedPublisherTest { pub ⇒
+ val latch = new Latch()
+ pub.subscribe {
+ new TestSubscriber[T] {
+ override def onComplete(): Unit = {
+ latch.assertOpen(s"Publisher $pub called `onComplete` twice on new Subscriber")
+ latch.close()
+ }
+ override def onSubscribe(subscription: Subscription): Unit =
+ flop(s"Publisher created by `createCompletedStatePublisher()` ($pub) called `onSubscribe` on new Subscriber")
+ }
+ }
+ latch.expectClose(timeoutMillis = 100, s"Publisher created by `createPublisher(0)` ($pub) did not call `onComplete` on new Subscriber")
+ Thread.sleep(100) // wait for the Publisher to potentially call 'onSubscribe' or `onNext` which would trigger an async error
+ }
+
+ // Publisher::subscribe(Subscriber)
+ // when Publisher is in `error` state
+ // must not call `onSubscribe` on the given Subscriber
+ // must trigger a call to `onError` on the given Subscriber
+ @Test
+ def publisherSubscribeWhenInErrorStateMustTriggerOnErrorAndNotOnSubscribe(): Unit =
+ errorPublisherTest { pub ⇒
+ val latch = new Latch()
+ pub.subscribe {
+ new TestSubscriber[T] {
+ override def onError(cause: Throwable): Unit = {
+ latch.assertOpen(s"Error-state Publisher $pub called `onError` twice on new Subscriber")
+ latch.close()
+ }
+ }
+ }
+ latch.expectClose(timeoutMillis = 100, s"Error-state Publisher $pub did not call `onError` on new Subscriber")
+ Thread.sleep(100) // wait for the Publisher to potentially call 'onSubscribe' or `onNext` which would trigger an async error
+ }
+
+ // Publisher::subscribe(Subscriber)
+ // when Publisher is in `shut-down` state
+ // must not call `onSubscribe` on the given Subscriber
+ // must trigger a call to `onError` with a `java.lang.IllegalStateException` on the given Subscriber
+ // Subscription::cancel
+ // when Subscription is not cancelled
+ // the Publisher must shut itself down if the given Subscription is the last downstream Subscription
+ @Test
+ def publisherSubscribeWhenInShutDownStateMustTriggerOnErrorAndNotOnSubscribe(): Unit =
+ activePublisherTest(3) { pub ⇒
+ val sub = newManualSubscriber(pub)
+ sub.cancel()
+
+ // we cannot meaningfully test whether the publisher has really shut down
+ // however, we can tests whether it reacts to new subscription requests with `onError`
+ val latch = new Latch()
+ pub.subscribe {
+ new TestSubscriber[T] {
+ override def onError(cause: Throwable): Unit = {
+ latch.assertOpen(s"shut-down-state Publisher $pub called `onError` twice on new Subscriber")
+ latch.close()
+ }
+ }
+ }
+ latch.expectClose(timeoutMillis = 100, s"shut-down-state Publisher $pub did not call `onError` on new Subscriber")
+ Thread.sleep(100) // wait for the Publisher to potentially call 'onSubscribe' or `onNext` which would trigger an async error
+ }
+
+ // Publisher::subscribe(Subscriber)
+ // when Publisher is neither in `completed` nor `error` state
+ // must trigger a call to `onSubscribe` on the given Subscriber if the Subscription is to be accepted
+ @Test
+ def publisherSubscribeWhenActiveMustCallOnSubscribeFirst(): Unit =
+ activePublisherTest(elements = 1) { pub ⇒
+ val latch = new Latch()
+ pub.subscribe {
+ new TestSubscriber[T] {
+ override def onSubscribe(subscription: Subscription): Unit = latch.close()
+ }
+ }
+ latch.expectClose(timeoutMillis = 100, s"Active Publisher $pub did not call `onSubscribe` on new subscription request")
+ }
+
+ // Publisher::subscribe(Subscriber)
+ // when Publisher is neither in `completed` nor `error` state
+ // must trigger a call to `onError` on the given Subscriber if the Subscription is to be rejected
+ // must reject the Subscription if the same Subscriber already has an active Subscription
+ @Test
+ def publisherSubscribeWhenActiveMustRejectDoubleSubscription(): Unit =
+ activePublisherTest(elements = 1) { pub ⇒
+ val latch = new Latch()
+ val errorCause = new Promise[Throwable]
+ val sub = new TestSubscriber[T] {
+ override def onSubscribe(subscription: Subscription): Unit = latch.close()
+ override def onError(cause: Throwable): Unit = errorCause.complete(cause)
+ }
+ pub.subscribe(sub)
+ latch.expectClose(timeoutMillis = 100, s"Active Publisher $pub did not call `onSubscribe` on first subscription request")
+ errorCause.assertUncompleted(s"Active Publisher $pub unexpectedly called `onError` on first subscription request")
+
+ latch.reOpen()
+ pub.subscribe(sub)
+ errorCause.expectCompletion(timeoutMillis = 100, s"Active Publisher $pub did not call `onError` on double subscription request")
+ if (!errorCause.value.isInstanceOf[IllegalStateException])
+ flop(s"Publisher $pub called `onError` with ${errorCause.value} rather than an `IllegalStateException` on double subscription request")
+ latch.assertOpen(s"Active Publisher $pub unexpectedly called `onSubscribe` on double subscription request")
+ }
+
+ // Subscription::requestMore(Int)
+ // when Subscription is cancelled
+ // must ignore the call
+ @Test
+ def subscriptionRequestMoreWhenCancelledMustIgnoreTheCall(): Unit =
+ activePublisherTest(elements = 1) { pub ⇒
+ val sub = newManualSubscriber(pub)
+ sub.subscription.value.cancel()
+ sub.subscription.value.requestMore(1) // must not throw
+ }
+
+ // Subscription::requestMore(Int)
+ // when Subscription is not cancelled
+ // must register the given number of additional elements to be produced to the respective subscriber
+ // A Publisher
+ // must not call `onNext`
+ // more times than the total number of elements that was previously requested with Subscription::requestMore by the corresponding subscriber
+ @Test
+ def subscriptionRequestMoreMustResultInTheCorrectNumberOfProducedElements(): Unit =
+ activePublisherTest(elements = 5) { pub ⇒
+ val sub = newManualSubscriber(pub)
+
+ sub.expectNone(errorMsg = x ⇒ s"Publisher $pub produced $x before the first `requestMore`")
+ sub.requestMore(1)
+ sub.nextElement(errorMsg = s"Publisher $pub produced no element after first `requestMore`")
+ sub.expectNone(errorMsg = x ⇒ s"Publisher $pub produced unrequested $x")
+
+ sub.requestMore(1)
+ sub.requestMore(2)
+ sub.nextElements(3, timeoutMillis = 100, s"Publisher $pub produced less than 3 elements after two respective `requestMore` calls")
+
+ sub.expectNone(errorMsg = x ⇒ s"Publisher $pub produced unrequested $x")
+ }
+
+ // Subscription::requestMore(Int)
+ // when Subscription is not cancelled
+ // must throw a `java.lang.IllegalArgumentException` if the argument is <= 0
+ @Test
+ def subscriptionRequestMoreMustThrowIfArgumentIsNonPositive(): Unit =
+ activePublisherTest(elements = 1) { pub ⇒
+ val sub = newManualSubscriber(pub)
+ expectThrowingOf[IllegalArgumentException](s"Calling `requestMore(-1)` a subscription to $pub did not fail with an `IllegalArgumentException`") {
+ sub.subscription.value.requestMore(-1)
+ }
+ expectThrowingOf[IllegalArgumentException](s"Calling `requestMore(0)` a subscription to $pub did not fail with an `IllegalArgumentException`") {
+ sub.subscription.value.requestMore(0)
+ }
+ }
+
+ // Subscription::cancel
+ // when Subscription is cancelled
+ // must ignore the call
+ @Test
+ def subscriptionCancelWhenCancelledMustIgnoreCall(): Unit =
+ activePublisherTest(elements = 1) { pub ⇒
+ val sub = newManualSubscriber(pub)
+ sub.subscription.value.cancel() // first time must succeed
+ sub.subscription.value.cancel() // the second time must not throw
+ }
+
+ // Subscription::cancel
+ // when Subscription is not cancelled
+ // the Publisher must eventually cease to call any methods on the corresponding subscriber
+ @Test
+ def onSubscriptionCancelThePublisherMustEventuallyCeaseToCallAnyMethodsOnTheSubscriber(): Unit =
+ activePublisherTest(elements = 0) { pub ⇒ // infinite stream
+ var drop = true
+ val sub = new ManualSubscriber[T] with SubscriptionSupport[T] {
+ override def onNext(element: T): Unit = if (!drop) super.onNext(element)
+ }
+ subscribe(pub, sub)
+ sub.requestMore(Int.MaxValue)
+ sub.cancel()
+ Thread.sleep(100)
+
+ drop = false // "switch on" element collection
+ sub.expectNone(withinMillis = 100)
+ }
+
+ // Subscription::cancel
+ // when Subscription is not cancelled
+ // the Publisher must eventually drop any references to the corresponding subscriber
+ @Test
+ def onSubscriptionCancelThePublisherMustEventuallyDropAllReferencesToTheSubscriber(): Unit = {
+ def run(pub: Publisher[T]): Reference[ManualSubscriber[T]] = {
+ val sub = newManualSubscriber(pub)
+ val ref = new WeakReference(sub, new ReferenceQueue[ManualSubscriber[T]]())
+ sub.requestMore(1)
+ sub.nextElement()
+ sub.cancel()
+ ref
+ }
+ activePublisherTest(elements = 3) { pub ⇒
+ val ref = run(pub)
+ // cancel may be run asynchronously so we add a sleep before running the GC
+ // to "resolve" the race
+ Thread.sleep(200)
+ System.gc()
+
+ if (!ref.isEnqueued) // consider switching to remove(timeout) on the queue if required
+ flop(s"Publisher $pub did not drop reference to test subscriber after subscription cancellation")
+ }
+ }
+
+ // Subscription::cancel
+ // when Subscription is not cancelled
+ // the Publisher must obey the "a Subscription::cancel happens before any subsequent Publisher::subscribe" rule
+ @Test
+ def onSubscriptionCancelThePublisherMustObeyCancelHappensBeforeSubsequentSubscribe(): Unit = {
+ activePublisherTest(elements = 3) { pub ⇒
+ val keeper = newManualSubscriber(pub) // required to prevent the publisher from shutting down
+ val sub = newManualSubscriber(pub)
+ for (i ← 1 to 100) {
+ // try to cancel and resubscribe very quickly, in order to potentially trigger an "overtaking"
+ sub.cancel()
+ subscribe(pub, sub)
+ }
+ }
+ }
+
+ // A Publisher
+ // must not call `onNext`
+ // after having issued an `onComplete` or `onError` call on a subscriber
+ @Test
+ def mustNotCallOnNextAfterHavingIssuedAnOnCompleteOrOnErrorCallOnASubscriber(): Unit = {
+ // this is implicitly verified by the test infrastructure
+ }
+
+ // A Publisher
+ // must produce the same elements in the same sequence for all its subscribers
+ @Test
+ def mustProduceTheSameElementsInTheSameSequenceForAllItsSubscribers(): Unit =
+ activePublisherTest(elements = 5) { pub ⇒
+ val sub1 = newManualSubscriber(pub)
+ val sub2 = newManualSubscriber(pub)
+ val sub3 = newManualSubscriber(pub)
+
+ sub1.requestMore(1)
+ val x1 = sub1.nextElement(errorMsg = s"Publisher $pub did not produce the requested 1 element on 1st subscriber")
+ sub2.requestMore(2)
+ val y1 = sub2.nextElements(2, errorMsg = s"Publisher $pub did not produce the requested 2 elements on 2nd subscriber")
+ sub1.requestMore(1)
+ val x2 = sub1.nextElement(errorMsg = s"Publisher $pub did not produce the requested 1 element on 1st subscriber")
+ sub3.requestMore(3)
+ val z1 = sub3.nextElements(3, errorMsg = s"Publisher $pub did not produce the requested 3 elements on 3rd subscriber")
+ sub3.requestMore(1)
+ val z2 = sub3.nextElement(errorMsg = s"Publisher $pub did not produce the requested 1 element on 3rd subscriber")
+ sub3.requestMore(1)
+ val z3 = sub3.nextElement(errorMsg = s"Publisher $pub did not produce the requested 1 element on 3rd subscriber")
+ sub3.expectCompletion(errorMsg = s"Publisher $pub did not complete the stream as expected on 3rd subscriber")
+ sub2.requestMore(3)
+ val y2 = sub2.nextElements(3, errorMsg = s"Publisher $pub did not produce the requested 3 elements on 2nd subscriber")
+ sub2.expectCompletion(errorMsg = s"Publisher $pub did not complete the stream as expected on 2nd subscriber")
+ sub1.requestMore(2)
+ val x3 = sub1.nextElements(2, errorMsg = s"Publisher $pub did not produce the requested 2 elements on 1st subscriber")
+ sub1.requestMore(1)
+ val x4 = sub1.nextElement(errorMsg = s"Publisher $pub did not produce the requested 1 element on 1st subscriber")
+ sub1.expectCompletion(errorMsg = s"Publisher $pub did not complete the stream as expected on 1st subscriber")
+
+ val r = x1 :: x2 :: x3.toList ::: x4 :: Nil
+ assertEquals(r, y1.toList ::: y2.toList, s"Publisher $pub did not produce the same element sequence for subscribers 1 and 2")
+ assertEquals(r, z1.toList ::: z2 :: z3 :: Nil, s"Publisher $pub did not produce the same element sequence for subscribers 1 and 3")
+ }
+
+ // A Publisher
+ // must support a pending element count up to 2^63-1 (Long.MAX_VALUE) and provide for overflow protection
+ @Test
+ def mustSupportAPendingElementCountUpToLongMaxValue(): Unit = {
+ // not really testable without more control over the Publisher,
+ // we verify this part of the fanout logic with the IdentityProcessorVerification
+ }
+
+ // A Publisher
+ // must call `onComplete` on a subscriber after having produced the final stream element to it
+ // must call `onComplete` on a subscriber at the earliest possible point in time
+ @Test
+ def mustCallOnCompleteOnASubscriberAfterHavingProducedTheFinalStreamElementToIt(): Unit = {
+ activePublisherTest(elements = 3) { pub ⇒
+ val sub = newManualSubscriber(pub)
+ sub.requestNextElement()
+ sub.requestNextElement()
+ sub.requestNextElement()
+ sub.expectCompletion(errorMsg = s"Publisher $pub did not complete the stream immediately after the final element")
+ sub.expectNone()
+ }
+ }
+
+ // A Publisher
+ // must start producing with the oldest still available element for a new subscriber
+ @Test
+ def mustStartProducingWithTheOldestStillAvailableElementForASubscriber(): Unit = {
+ // can only be properly tested if we know more about the Producer implementation
+ // like buffer size and buffer retention logic
+ }
+
+ // A Publisher
+ // must call `onError` on all its subscribers if it encounters a non-recoverable error
+ @Test
+ def mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError(): Unit = {
+ // not really testable without more control over the Publisher,
+ // we verify this part of the fanout logic with the IdentityProcessorVerification
+ }
+
+ // A Publisher
+ // must not call `onComplete` or `onError` more than once per subscriber
+ @Test
+ def mustNotCallOnCompleteOrOnErrorMoreThanOncePerSubscriber(): Unit = {
+ // this is implicitly verified by the test infrastructure
+ }
+
+ /////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////
+
+ /////////////////////// TEST INFRASTRUCTURE //////////////////////
+
+ def activePublisherTest[U](elements: Int)(body: Publisher[T] ⇒ U): Unit = {
+ val pub = createPublisher(elements)
+ body(pub)
+ verifyNoAsyncErrors()
+ }
+
+ def completedPublisherTest[U](body: Publisher[T] ⇒ U): Unit =
+ potentiallyPendingTest(createCompletedStatePublisher(), body)
+
+ def errorPublisherTest[U](body: Publisher[T] ⇒ U): Unit =
+ potentiallyPendingTest(createErrorStatePublisher(), body)
+
+ def potentiallyPendingTest[U](pub: Publisher[T], body: Publisher[T] ⇒ U): Unit =
+ if (pub ne null) {
+ body(pub)
+ verifyNoAsyncErrors()
+ } // else test is pending/ignored, which our great Java test frameworks have no (non-static) concept for
+
+}
\ No newline at end of file
diff --git a/tck/src/main/scala/asyncrx/tck/SubscriberVerification.scala b/tck/src/main/scala/asyncrx/tck/SubscriberVerification.scala
new file mode 100644
index 00000000..77da20a6
--- /dev/null
+++ b/tck/src/main/scala/asyncrx/tck/SubscriberVerification.scala
@@ -0,0 +1,242 @@
+package asyncrx.tck
+
+import org.testng.annotations.Test
+import asyncrx.spi._
+
+trait SubscriberVerification[T] extends TestEnvironment {
+ import SubscriberVerification._
+
+ // TODO: make the timeouts be dilate-able so that one can tune the suite for the machine it runs on
+
+ /**
+ * This is the main method you must implement in your test incarnation.
+ * It must create a new Subscriber instance to be subjected to the testing logic.
+ *
+ * In order to be meaningfully testable your Subscriber must inform the given
+ * `SubscriberProbe` of the respective events having been received.
+ */
+ def createSubscriber(probe: SubscriberProbe[T]): Subscriber[T]
+
+ /**
+ * Helper method required for generating test elements.
+ * It must create a Publisher for a stream with exactly the given number of elements.
+ * If `elements` is zero the produced stream must be infinite.
+ */
+ def createHelperPublisher(elements: Int): Publisher[T]
+
+ ////////////////////// TEST SETUP VERIFICATION ///////////////////////////
+
+ @Test
+ def exerciseHappyPath(): Unit =
+ new TestSetup {
+ puppet.triggerRequestMore(1)
+ expectRequestMore(1)
+ sendNextTFromUpstream()
+ probe.expectNext(lastT)
+
+ puppet.triggerRequestMore(1)
+ expectRequestMore(1)
+ sendNextTFromUpstream()
+ probe.expectNext(lastT)
+
+ puppet.triggerCancel()
+ expectCancelling()
+
+ verifyNoAsyncErrors()
+ }
+
+ ////////////////////// SPEC RULE VERIFICATION ///////////////////////////
+
+ // Subscriber::onSubscribe(Subscription), Subscriber::onNext(T)
+ // must asynchronously schedule a respective event to the subscriber
+ // must not call any methods on the Subscription, the Publisher or any other Publishers or Subscribers
+ @Test
+ def onSubscribeAndOnNextMustAsynchronouslyScheduleAnEvent(): Unit = {
+ // cannot be meaningfully tested, or can it?
+ }
+
+ // Subscriber::onComplete, Subscriber::onError(Throwable)
+ // must asynchronously schedule a respective event to the Subscriber
+ // must not call any methods on the Subscription, the Publisher or any other Publishers or Subscribers
+ // must consider the Subscription cancelled after having received the event
+ @Test
+ def onCompleteAndOnErrorMustAsynchronouslyScheduleAnEvent(): Unit = {
+ // cannot be meaningfully tested, or can it?
+ }
+
+ // A Subscriber
+ // must not accept an `onSubscribe` event if it already has an active Subscription
+ @Test
+ def mustNotAcceptAnOnSubscribeEventIfItAlreadyHasAnActiveSubscription(): Unit =
+ new TestSetup {
+ // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
+ sub.onSubscribe {
+ new Subscription {
+ def requestMore(elements: Int): Unit = flop(s"Subscriber $sub illegally called `subscription.requestMore($elements)`")
+ def cancel(): Unit = flop(s"Subscriber $sub illegally called `subscription.cancel()`")
+ }
+ }
+ verifyNoAsyncErrors()
+ }
+
+ // A Subscriber
+ // must call Subscription::cancel during shutdown if it still has an active Subscription
+ @Test
+ def mustCallSubscriptionCancelDuringShutdownIfItStillHasAnActiveSubscription(): Unit =
+ new TestSetup {
+ puppet.triggerShutdown()
+ expectCancelling()
+
+ verifyNoAsyncErrors()
+ }
+
+ // A Subscriber
+ // must ensure that all calls on a Subscription take place from the same thread or provide for respective external synchronization
+ @Test
+ def mustEnsureThatAllCallsOnASubscriptionTakePlaceFromTheSameThreadOrProvideExternalSync(): Unit = {
+ // cannot be meaningfully tested, or can it?
+ }
+
+ // A Subscriber
+ // must be prepared to receive one or more `onNext` events after having called Subscription::cancel
+ @Test
+ def mustBePreparedToReceiveOneOrMoreOnNextEventsAfterHavingCalledSubscriptionCancel(): Unit =
+ new TestSetup {
+ puppet.triggerRequestMore(1)
+ puppet.triggerCancel()
+ expectCancelling()
+ sendNextTFromUpstream()
+
+ verifyNoAsyncErrors()
+ }
+
+ // A Subscriber
+ // must be prepared to receive an `onComplete` event with a preceding Subscription::requestMore call
+ @Test
+ def mustBePreparedToReceiveAnOnCompleteEventWithAPrecedingSubscriptionRequestMore(): Unit =
+ new TestSetup {
+ puppet.triggerRequestMore(1)
+ sendCompletion()
+ probe.expectCompletion()
+
+ verifyNoAsyncErrors()
+ }
+
+ // A Subscriber
+ // must be prepared to receive an `onComplete` event without a preceding Subscription::requestMore call
+ @Test
+ def mustBePreparedToReceiveAnOnCompleteEventWithoutAPrecedingSubscriptionRequestMore(): Unit =
+ new TestSetup {
+ sendCompletion()
+ probe.expectCompletion()
+
+ verifyNoAsyncErrors()
+ }
+
+ // A Subscriber
+ // must be prepared to receive an `onError` event with a preceding Subscription::requestMore call
+ @Test
+ def mustBePreparedToReceiveAnOnErrorEventWithAPrecedingSubscriptionRequestMore(): Unit =
+ new TestSetup {
+ puppet.triggerRequestMore(1)
+ val ex = new RuntimeException("Test exception")
+ sendError(ex)
+ probe.expectError(ex)
+
+ verifyNoAsyncErrors()
+ }
+
+ // A Subscriber
+ // must be prepared to receive an `onError` event without a preceding Subscription::requestMore call
+ @Test
+ def mustBePreparedToReceiveAnOnErrorEventWithoutAPrecedingSubscriptionRequestMore(): Unit =
+ new TestSetup {
+ val ex = new RuntimeException("Test exception")
+ sendError(ex)
+ probe.expectError(ex)
+
+ verifyNoAsyncErrors()
+ }
+
+ // A Subscriber
+ // must make sure that all calls on its `onXXX` methods happen-before the processing of the respective events
+ @Test
+ def mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents(): Unit = {
+ // cannot be meaningfully tested, or can it?
+ }
+
+ /////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////
+
+ /////////////////////// TEST INFRASTRUCTURE //////////////////////
+
+ abstract class TestSetup extends ManualPublisher[T] {
+ val tees = newManualSubscriber(createHelperPublisher(0)) // gives us access to an infinite stream of T values
+ val probe = new Probe
+ var lastT: T = _
+ subscribe(createSubscriber(probe))
+ def sub = subscriber.get
+ probe.puppet.expectCompletion(timeoutMillis = 100, errorMsg = s"Subscriber $sub did not `registerOnSubscribe`")
+
+ def puppet: SubscriberPuppet = probe.puppet.value
+ def sendNextTFromUpstream(): Unit = sendNext(nextT())
+ def nextT(): T = {
+ lastT = tees.requestNextElement()
+ lastT
+ }
+
+ class Probe extends SubscriberProbe[T] {
+ val puppet = new Promise[SubscriberPuppet]
+ val elements = new Receptacle[T]
+ val completed = new Latch
+ val error = new Promise[Throwable]
+ def registerOnSubscribe(p: SubscriberPuppet): Unit =
+ if (!puppet.isCompleted) puppet.complete(p)
+ else flop(s"Subscriber $sub illegally accepted a second Subscription")
+ def registerOnNext(element: T): Unit = elements.add(element)
+ def registerOnComplete(): Unit = completed.close()
+ def registerOnError(cause: Throwable): Unit = error.complete(cause)
+ def expectNext(expected: T, timeoutMillis: Int = 100) = {
+ val received = elements.next(timeoutMillis, s"Subscriber $sub did not call `registerOnNext($expected)`")
+ if (received != expected)
+ flop(s"Subscriber $sub called `registerOnNext($received)` rather than `registerOnNext($expected)`")
+ }
+ def expectCompletion(timeoutMillis: Int = 100): Unit =
+ completed.expectClose(timeoutMillis, s"Subscriber $sub did not call `registerOnComplete()`")
+ def expectError(expected: Throwable, timeoutMillis: Int = 100): Unit = {
+ error.expectCompletion(timeoutMillis, s"Subscriber $sub did not call `registerOnError($expected)`")
+ if (error.value != expected)
+ flop(s"Subscriber $sub called `registerOnError(${error.value})` rather than `registerOnError($expected)`")
+ }
+ }
+ }
+}
+
+object SubscriberVerification {
+ trait SubscriberProbe[T] {
+ /**
+ * Must be called by the test subscriber when it has received the `onSubscribe` event.
+ */
+ def registerOnSubscribe(puppet: SubscriberPuppet): Unit
+
+ /**
+ * Must be called by the test subscriber when it has received an`onNext` event.
+ */
+ def registerOnNext(element: T): Unit
+
+ /**
+ * Must be called by the test subscriber when it has received an `onComplete` event.
+ */
+ def registerOnComplete(): Unit
+
+ /**
+ * Must be called by the test subscriber when it has received an `onError` event.
+ */
+ def registerOnError(cause: Throwable): Unit
+ }
+
+ trait SubscriberPuppet {
+ def triggerShutdown(): Unit
+ def triggerRequestMore(elements: Int): Unit
+ def triggerCancel(): Unit
+ }
+}
\ No newline at end of file
diff --git a/tck/src/main/scala/asyncrx/tck/TestEnvironment.scala b/tck/src/main/scala/asyncrx/tck/TestEnvironment.scala
new file mode 100644
index 00000000..044b07b9
--- /dev/null
+++ b/tck/src/main/scala/asyncrx/tck/TestEnvironment.scala
@@ -0,0 +1,225 @@
+package asyncrx.tck
+
+import java.util.concurrent._
+import org.testng.Assert.fail
+import scala.annotation.tailrec
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+import scala.collection.JavaConverters._
+
+import asyncrx.spi._
+
+trait TestEnvironment {
+
+ val asyncErrors = new CopyOnWriteArrayList[Throwable]()
+
+ // don't use the name `fail` as it would collide with other `fail` definitions like the one in scalatest's traits
+ def flop(msg: String): Nothing =
+ try fail(msg).asInstanceOf[Nothing]
+ catch {
+ case t: Throwable ⇒
+ asyncErrors.add(t)
+ throw t
+ }
+
+ def expectThrowingOf[T <: Exception: ClassTag](errorMsg: String)(block: ⇒ Unit): Unit =
+ try {
+ block
+ flop(errorMsg)
+ } catch {
+ case e: Exception if implicitly[ClassTag[T]].runtimeClass.isInstance(e) ⇒ // ok
+ case NonFatal(e) ⇒ flop(s"$errorMsg but $e")
+ }
+
+ def subscribe[T](pub: Publisher[T], sub: TestSubscriber[T], timeoutMillis: Int = 100): Unit = {
+ pub.subscribe(sub)
+ sub.subscription.expectCompletion(timeoutMillis, s"Could not subscribe $sub to Publisher $pub")
+ verifyNoAsyncErrors()
+ }
+
+ def newManualSubscriber[T](pub: Publisher[T], timeoutMillis: Int = 100): ManualSubscriber[T] = {
+ val sub = new ManualSubscriber[T] with SubscriptionSupport[T]
+ subscribe(pub, sub, timeoutMillis)
+ sub
+ }
+
+ def verifyNoAsyncErrors(): Unit =
+ asyncErrors.asScala.foreach {
+ case e: AssertionError ⇒ throw e
+ case e ⇒ fail("Async error during test execution: " + e)
+ }
+
+ class TestSubscriber[T] extends Subscriber[T] {
+ @volatile var subscription = new Promise[Subscription]
+ def onError(cause: Throwable): Unit = flop(s"Unexpected Subscriber::onError($cause)")
+ def onComplete(): Unit = flop("Unexpected Subscriber::onComplete()")
+ def onNext(element: T): Unit = flop(s"Unexpected Subscriber::onNext($element)")
+ def onSubscribe(subscription: Subscription): Unit = flop(s"Unexpected Subscriber::onSubscribe($subscription)")
+ def cancel(): Unit =
+ if (subscription.isCompleted) {
+ subscription.value.cancel()
+ subscription = new Promise[Subscription]
+ } else flop("Cannot cancel a subscription before having received it")
+ }
+
+ class ManualSubscriber[T] extends TestSubscriber[T] {
+ val received = new Receptacle[T]
+ override def onNext(element: T) = received.add(element)
+ override def onComplete() = received.complete()
+ def requestMore(elements: Int): Unit = subscription.value.requestMore(elements)
+ def requestNextElement(timeoutMillis: Int = 100, errorMsg: String = "Did not receive expected element"): T = {
+ requestMore(1)
+ nextElement(timeoutMillis, errorMsg)
+ }
+ def requestNextElementOrEndOfStream(timeoutMillis: Int = 100, errorMsg: String = "Did not receive expected stream completion"): Option[T] = {
+ requestMore(1)
+ nextElementOrEndOfStream(timeoutMillis, errorMsg)
+ }
+ def requestNextElements(elements: Int, timeoutMillis: Int = 100, errorMsg: String = "Did not receive expected elements"): Seq[T] = {
+ requestMore(elements)
+ nextElements(elements, timeoutMillis, errorMsg)
+ }
+ def nextElement(timeoutMillis: Int = 100, errorMsg: String = "Did not receive expected element"): T =
+ received.next(timeoutMillis, errorMsg)
+ def nextElementOrEndOfStream(timeoutMillis: Int = 100, errorMsg: String = "Did not receive expected stream completion"): Option[T] =
+ received.nextOrEndOfStream(timeoutMillis, errorMsg)
+ def nextElements(elements: Int, timeoutMillis: Int = 100, errorMsg: String = "Did not receive expected element or completion"): Seq[T] =
+ received.nextN(elements, timeoutMillis, errorMsg)
+ def expectNext(expected: T, timeoutMillis: Int = 100): Unit = {
+ val received = nextElement(timeoutMillis, "Did not receive expected element on downstream")
+ if (received != expected) flop(s"Expected element $expected on downstream but received $received")
+ }
+ def expectCompletion(timeoutMillis: Int = 100, errorMsg: String = "Did not receive expected stream completion"): Unit =
+ received.expectCompletion(timeoutMillis, errorMsg)
+ def expectNone(withinMillis: Int = 100, errorMsg: T ⇒ String = "Did not expect an element but got " + _): Unit =
+ received.expectNone(withinMillis, errorMsg)
+ }
+
+ trait SubscriptionSupport[T] extends TestSubscriber[T] {
+ abstract override def onNext(element: T) =
+ if (subscription.isCompleted) super.onNext(element)
+ else flop(s"Subscriber::onNext($element) called before Subscriber::onSubscribe")
+ abstract override def onComplete() =
+ if (subscription.isCompleted) super.onComplete()
+ else flop("Subscriber::onComplete() called before Subscriber::onSubscribe")
+ abstract override def onSubscribe(s: Subscription) =
+ if (!subscription.isCompleted) subscription.complete(s)
+ else flop("Subscriber::onSubscribe called on an already-subscribed Subscriber")
+ abstract override def onError(cause: Throwable) =
+ if (subscription.isCompleted) super.onError(cause)
+ else flop(s"Subscriber::onError($cause) called before Subscriber::onSubscribe")
+ }
+
+ trait ErrorCollection[T] extends TestSubscriber[T] {
+ val error = new Promise[Throwable]
+ override def onError(cause: Throwable): Unit = error.complete(cause)
+ def expectError(cause: Throwable, timeoutMillis: Int = 100): Unit = {
+ error.expectCompletion(timeoutMillis, "Did not receive expected error on downstream")
+ if (error.value != cause) flop(s"Expected error $cause but got ${error.value}")
+ }
+ }
+
+ class ManualPublisher[T] extends Publisher[T] {
+ var subscriber: Option[Subscriber[T]] = None
+ val requests = new Receptacle[Int]()
+ val cancelled = new Latch
+ def subscribe(s: Subscriber[T]): Unit =
+ if (subscriber.isEmpty) {
+ subscriber = Some(s)
+ s.onSubscribe {
+ new Subscription {
+ def requestMore(elements: Int): Unit = requests.add(elements)
+ def cancel(): Unit = cancelled.close()
+ }
+ }
+ } else flop("TestPublisher doesn't support more than one Subscriber")
+ def sendNext(element: T): Unit =
+ if (subscriber.isDefined) subscriber.get.onNext(element)
+ else flop(s"Cannot sendNext before subscriber subscription")
+ def sendCompletion(): Unit =
+ if (subscriber.isDefined) subscriber.get.onComplete()
+ else flop(s"Cannot sendCompletion before subscriber subscription")
+ def sendError(cause: Throwable): Unit =
+ if (subscriber.isDefined) subscriber.get.onError(cause)
+ else flop(s"Cannot sendError before subscriber subscription")
+ def nextRequestMore(timeoutMillis: Int = 100): Int =
+ requests.next(timeoutMillis, "Did not receive expected `requestMore` call")
+ def expectRequestMore(expected: Int, timeoutMillis: Int = 100): Unit = {
+ val requested = nextRequestMore(timeoutMillis)
+ if (requested != expected) flop(s"Received `requestMore($requested)` on upstream but expected `requestMore($expected)`")
+ }
+ def expectNoRequestMore(timeoutMillis: Int = 100): Unit =
+ requests.expectNone(timeoutMillis, "Received an unexpected `requestMore" + _ + "` call")
+ def expectCancelling(timeoutMillis: Int = 100): Unit =
+ cancelled.expectClose(timeoutMillis, "Did not receive expected cancelling of upstream subscription")
+ }
+
+ // like a CountDownLatch, but resettable and with some convenience methods
+ class Latch() {
+ @volatile private var countDownLatch = new CountDownLatch(1)
+ def reOpen() = countDownLatch = new CountDownLatch(1)
+ def isClosed: Boolean = countDownLatch.getCount == 0
+ def close() = countDownLatch.countDown()
+ def assertClosed(openErrorMsg: String): Unit = if (!isClosed) flop(openErrorMsg)
+ def assertOpen(closedErrorMsg: String): Unit = if (isClosed) flop(closedErrorMsg)
+ def expectClose(timeoutMillis: Int, notClosedErrorMsg: String): Unit = {
+ countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS)
+ if (countDownLatch.getCount > 0) flop(s"$notClosedErrorMsg within $timeoutMillis ms")
+ }
+ }
+
+ // simple promise for *one* value, which cannot be reset
+ class Promise[T] {
+ private val abq = new ArrayBlockingQueue[T](1)
+ @volatile private var _value: T = _
+ def value: T = if (isCompleted) _value else flop("Cannot access promise value before completion")
+ def isCompleted: Boolean = _value.asInstanceOf[AnyRef] ne null
+ def complete(value: T): Unit = abq.add(value)
+ def assertCompleted(errorMsg: String): Unit = if (!isCompleted) flop(errorMsg)
+ def assertUncompleted(errorMsg: String): Unit = if (isCompleted) flop(errorMsg)
+ def expectCompletion(timeoutMillis: Int, errorMsg: String): Unit =
+ if (!isCompleted) abq.poll(timeoutMillis, TimeUnit.MILLISECONDS) match {
+ case null ⇒ flop(s"$errorMsg within $timeoutMillis ms")
+ case x ⇒ _value = x
+ }
+ }
+
+ // a "Promise" for multiple values, which also supports "end-of-stream reached"
+ class Receptacle[T](val queueSize: Int = 16) {
+ private val abq = new ArrayBlockingQueue[Option[T]](queueSize)
+ def add(value: T): Unit = abq.add(Some(value))
+ def complete(): Unit = abq.add(None)
+ def next(timeoutMillis: Int, errorMsg: String): T =
+ abq.poll(timeoutMillis, TimeUnit.MILLISECONDS) match {
+ case null ⇒ flop(s"$errorMsg within $timeoutMillis ms")
+ case Some(x) ⇒ x
+ case None ⇒ flop(s"Expected element but got end-of-stream")
+ }
+ def nextOrEndOfStream(timeoutMillis: Int, errorMsg: String): Option[T] =
+ abq.poll(timeoutMillis, TimeUnit.MILLISECONDS) match {
+ case null ⇒ flop(s"$errorMsg within $timeoutMillis ms")
+ case x ⇒ x
+ }
+ def nextN(elements: Int, timeoutMillis: Int, errorMsg: String): Seq[T] = {
+ @tailrec def rec(remaining: Int, result: Seq[T] = Nil): Seq[T] =
+ if (remaining > 0)
+ rec(remaining - 1, result :+ next(timeoutMillis, errorMsg)) // TODO: fix error messages showing wrong timeout info
+ else result
+ rec(elements)
+ }
+ def expectCompletion(timeoutMillis: Int, errorMsg: String): Unit =
+ abq.poll(timeoutMillis, TimeUnit.MILLISECONDS) match {
+ case null ⇒ flop(s"$errorMsg within $timeoutMillis ms")
+ case Some(x) ⇒ flop(s"Expected end-of-stream but got $x")
+ case None ⇒ // ok
+ }
+ def expectNone(withinMillis: Int, errorMsg: T ⇒ String): Unit = {
+ Thread.sleep(withinMillis)
+ abq.poll() match {
+ case null ⇒ // ok
+ case Some(x) ⇒ flop(errorMsg(x))
+ case None ⇒ flop("Expected no element but got end-of-stream")
+ }
+ }
+ }
+}
\ No newline at end of file