Skip to content
This repository was archived by the owner on Jun 27, 2018. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ before_install:
- chmod +x gradlew

script:
- export GRADLE_OPTS=-Xmx1024m
- ./gradlew assemble --stacktrace
- ./gradlew check jacocoFullReport --stacktrace

Expand Down
17 changes: 12 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ Preview for version 3 of RxJava, the modern ReactiveX style library for composin

```groovy
// shared components
compile "com.github.akarnokd:rxjava3-common:0.1.0"
compile "com.github.akarnokd:rxjava3-common:0.2.0"

// Flowable only
compile "com.github.akarnokd:rxjava3-flowable:0.1.0"
compile "com.github.akarnokd:rxjava3-flowable:0.2.0"

// Observable, Single, Maybe, Completable
compile "com.github.akarnokd:rxjava3-observable:0.1.0"
compile "com.github.akarnokd:rxjava3-observable:0.2.0"

// Interoperation between Flowable and the rest
compile "com.github.akarnokd:rxjava3-interop:0.1.0"
compile "com.github.akarnokd:rxjava3-interop:0.2.0"
```

## Structure
Expand All @@ -45,4 +45,11 @@ This is an unofficial preparation place for RxJava 3 where the major change is t
- dependencies: **rxjava3-commons**
- `rxjava3-interop`
- transformers and converters between the backpressured `Flowable` and the non-backpressured `Observable` types
- dependencies: **rxjava3-flowable**, **rxjava3-observable**, (-> **rxjava3-commons**, **reactive-streams-extensions**, **reactive-streams**)
- dependencies: **rxjava3-flowable**, **rxjava3-observable**, (-> **rxjava3-commons**, **reactive-streams-extensions**, **reactive-streams**)


## TODOs

### Work out how the snapshot release and final release works in RxJava 1/2's Nebula plugin

Currently, this preview releases manually and not in response to merging or hitting the GitHub release button. I don't really know which and how the unsupported Nebula plugin works and if it supports Gradle subprojects. Also due to the encrypted credentials, such auto-release must happen from within ReactiveX/RxJava.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ subprojects {
check.dependsOn testng

task GCandMem(dependsOn: 'check') << {
print("Memory usage before GC: ")
println(java.lang.management.ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed() / 1024.0 / 1024.0)
System.gc()
Thread.sleep(200)
print("Memory usage: ")
Expand Down
37 changes: 18 additions & 19 deletions common/src/main/java/io/reactivex/common/RxJavaCommonPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package io.reactivex.common;

import io.reactivex.common.annotations.Experimental;
import io.reactivex.common.annotations.NonNull;
import io.reactivex.common.annotations.Nullable;
import io.reactivex.common.exceptions.*;
Expand Down Expand Up @@ -98,10 +97,10 @@ public static boolean isLockdown() {
* Enables or disables the blockingX operators to fail
* with an IllegalStateException on a non-blocking
* scheduler such as computation or single.
* <p>History: 2.0.5 - experimental
* @param enable enable or disable the feature
* @since 2.0.5 - experimental
* @since 2.1
*/
@Experimental
public static void setFailOnNonBlockingScheduler(boolean enable) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
Expand All @@ -113,10 +112,10 @@ public static void setFailOnNonBlockingScheduler(boolean enable) {
* Returns true if the blockingX operators fail
* with an IllegalStateException on a non-blocking scheduler
* such as computation or single.
* <p>History: 2.0.5 - experimental
* @return true if the blockingX operators fail on a non-blocking scheduler
* @since 2.0.5 - experimental
* @since 2.1
*/
@Experimental
public static boolean isFailOnNonBlockingScheduler() {
return failNonBlockingScheduler;
}
Expand Down Expand Up @@ -566,11 +565,11 @@ public static void setSingleSchedulerHandler(@Nullable Function<? super Schedule
* such as awaiting a condition or signal
* and should return true to indicate the operator
* should not block but throw an IllegalArgumentException.
* <p>History: 2.0. - experimental
* @return true if the blocking should be prevented
* @see #setFailOnNonBlockingScheduler(boolean)
* @since 2.0.5 - experimental
* @since 2.1
*/
@Experimental
public static boolean onBeforeBlocking() {
BooleanSupplier f = onBeforeBlocking;
if (f != null) {
Expand All @@ -587,12 +586,12 @@ public static boolean onBeforeBlocking() {
* Set the handler that is called when an operator attempts a blocking
* await; the handler should return true to prevent the blocking
* and to signal an IllegalStateException instead.
* <p>History: 2.0.5 - experimental
* @param handler the handler to set, null resets to the default handler
* that always returns false
* @see #onBeforeBlocking()
* @since 2.0.5 - experimental
* @since 2.1
*/
@Experimental
public static void setOnBeforeBlocking(@Nullable BooleanSupplier handler) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
Expand All @@ -603,10 +602,10 @@ public static void setOnBeforeBlocking(@Nullable BooleanSupplier handler) {
/**
* Returns the current blocking handler or null if no custom handler
* is set.
* <p>History: 2.0.5 - experimental
* @return the current blocking handler or null if not specified
* @since 2.0.5 - experimental
* @since 2.1
*/
@Experimental
@Nullable
public static BooleanSupplier getOnBeforeBlocking() {
return onBeforeBlocking;
Expand All @@ -615,12 +614,12 @@ public static BooleanSupplier getOnBeforeBlocking() {
/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#computation()}
* except using {@code threadFactory} for thread creation.
* <p>History: 2.0.5 - experimental
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
* @since 2.0.5 - experimental
* @since 2.1
*/
@Experimental
@NonNull
public static Scheduler createComputationScheduler(@NonNull ThreadFactory threadFactory) {
return new ComputationScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null"));
Expand All @@ -629,12 +628,12 @@ public static Scheduler createComputationScheduler(@NonNull ThreadFactory thread
/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#io()}
* except using {@code threadFactory} for thread creation.
* <p>History: 2.0.5 - experimental
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
* @since 2.0.5 - experimental
* @since 2.1
*/
@Experimental
@NonNull
public static Scheduler createIoScheduler(@NonNull ThreadFactory threadFactory) {
return new IoScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null"));
Expand All @@ -643,12 +642,12 @@ public static Scheduler createIoScheduler(@NonNull ThreadFactory threadFactory)
/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#newThread()}
* except using {@code threadFactory} for thread creation.
* <p>History: 2.0.5 - experimental
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
* @since 2.0.5 - experimental
* @since 2.1
*/
@Experimental
@NonNull
public static Scheduler createNewThreadScheduler(@NonNull ThreadFactory threadFactory) {
return new NewThreadScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null"));
Expand All @@ -657,12 +656,12 @@ public static Scheduler createNewThreadScheduler(@NonNull ThreadFactory threadFa
/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#single()}
* except using {@code threadFactory} for thread creation.
* <p>History: 2.0.5 - experimental
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
* @since 2.0.5 - experimental
* @since 2.1
*/
@Experimental
@NonNull
public static Scheduler createSingleScheduler(@NonNull ThreadFactory threadFactory) {
return new SingleScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null"));
Expand Down
4 changes: 2 additions & 2 deletions common/src/main/java/io/reactivex/common/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ public static Scheduler single() {
* <p>
* Starting, stopping and restarting this scheduler is not supported (no-op) and the provided
* executor's lifecycle must be managed externally:
* <code><pre>
* <pre><code>
* ExecutorService exec = Executors.newSingleThreadedExecutor();
* try {
* Scheduler scheduler = Schedulers.from(exec);
Expand All @@ -324,7 +324,7 @@ public static Scheduler single() {
* } finally {
* exec.shutdown();
* }
* </pre></code>
* </code></pre>
* <p>
* This type of scheduler is less sensitive to leaking {@link io.reactivex.common.Scheduler.Worker} instances, although
* not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or
Expand Down
47 changes: 23 additions & 24 deletions common/src/main/java/io/reactivex/common/TestConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.*;
import java.util.concurrent.*;

import io.reactivex.common.annotations.Experimental;
import io.reactivex.common.exceptions.CompositeException;
import io.reactivex.common.functions.Predicate;
import io.reactivex.common.internal.functions.*;
Expand Down Expand Up @@ -336,11 +335,11 @@ public final U assertValue(T value) {
* Assert that this TestObserver/TestObserver did not receive an onNext value which is equal to
* the given value with respect to Objects.equals.
*
* @since 2.0.5 - experimental
* <p>History: 2.0.5 - experimental
* @since 2.1
* @param value the value to expect not being received
* @return this;
*/
@Experimental
@SuppressWarnings("unchecked")
public final U assertNever(T value) {
int s = values.size();
Expand Down Expand Up @@ -377,12 +376,12 @@ public final U assertValue(Predicate<T> valuePredicate) {
* Asserts that this TestObserver/TestObserver did not receive any onNext value for which
* the provided predicate returns true.
*
* @since 2.0.5 - experimental
* <p>History: 2.0.5 - experimental
* @since 2.1
* @param valuePredicate the predicate that receives the onNext value
* and should return true for the expected value.
* @return this
*/
@Experimental
@SuppressWarnings("unchecked")
public final U assertNever(Predicate<? super T> valuePredicate) {
int s = values.size();
Expand Down Expand Up @@ -548,7 +547,7 @@ public final U assertValueSequence(Iterable<? extends T> sequence) {
throw fail("More values received than expected (" + i + ")");
}
if (expectedNext) {
throw fail("Fever values received than expected (" + i + ")");
throw fail("Fewer values received than expected (" + i + ")");
}
return (U)this;
}
Expand Down Expand Up @@ -780,12 +779,12 @@ public final U assertEmpty() {
/**
* Set the tag displayed along with an assertion failure's
* other state information.
* <p>History: 2.0.7 - experimental
* @param tag the string to display (null won't print any tag)
* @return this
* @since 2.0.7 - experimental
* @since 2.1
*/
@SuppressWarnings("unchecked")
@Experimental
public final U withTag(CharSequence tag) {
this.tag = tag;
return (U)this;
Expand All @@ -794,9 +793,9 @@ public final U withTag(CharSequence tag) {
/**
* Enumeration of default wait strategies when waiting for a specific number of
* items in {@link TestConsumer#awaitCount(int, Runnable)}.
* @since 2.0.7 - experimental
* <p>History: 2.0.7 - experimental
* @since 2.1
*/
@Experimental
public enum TestWaitStrategy implements Runnable {
/** The wait loop will spin as fast as possible. */
SPIN {
Expand Down Expand Up @@ -859,12 +858,12 @@ static void sleep(int millis) {
* Await until the TestObserver/TestObserver receives the given
* number of items or terminates by sleeping 10 milliseconds at a time
* up to 5000 milliseconds of timeout.
* <p>History: 2.0.7 - experimental
* @param atLeast the number of items expected at least
* @return this
* @see #awaitCount(int, Runnable, long)
* @since 2.0.7 - experimental
* @since 2.1
*/
@Experimental
public final U awaitCount(int atLeast) {
return awaitCount(atLeast, TestWaitStrategy.SLEEP_10MS, 5000);
}
Expand All @@ -873,23 +872,24 @@ public final U awaitCount(int atLeast) {
* Await until the TestObserver/TestObserver receives the given
* number of items or terminates by waiting according to the wait
* strategy and up to 5000 milliseconds of timeout.
* <p>History: 2.0.7 - experimental
* @param atLeast the number of items expected at least
* @param waitStrategy a Runnable called when the current received count
* hasn't reached the expected value and there was
* no terminal event either, see {@link TestWaitStrategy}
* for examples
* @return this
* @see #awaitCount(int, Runnable, long)
* @since 2.0.7 - experimental
* @since 2.1
*/
@Experimental
public final U awaitCount(int atLeast, Runnable waitStrategy) {
return awaitCount(atLeast, waitStrategy, 5000);
}

/**
* Await until the TestObserver/TestObserver receives the given
* number of items or terminates.
* <p>History: 2.0.7 - experimental
* @param atLeast the number of items expected at least
* @param waitStrategy a Runnable called when the current received count
* hasn't reached the expected value and there was
Expand All @@ -898,10 +898,9 @@ public final U awaitCount(int atLeast, Runnable waitStrategy) {
* @param timeoutMillis if positive, the await ends if the specified amount of
* time has passed no matter how many items were received
* @return this
* @since 2.0.7 - experimental
* @since 2.1
*/
@SuppressWarnings("unchecked")
@Experimental
public final U awaitCount(int atLeast, Runnable waitStrategy, long timeoutMillis) {
long start = System.currentTimeMillis();
for (;;) {
Expand All @@ -922,37 +921,37 @@ public final U awaitCount(int atLeast, Runnable waitStrategy, long timeoutMillis
}

/**
* <p>History: 2.0.7 - experimental
* @return true if one of the timeout-based await methods has timed out.
* @see #clearTimeout()
* @see #assertTimeout()
* @see #assertNoTimeout()
* @since 2.0.7 - experimental
* @since 2.1
*/
@Experimental
public final boolean isTimeout() {
return timeout;
}

/**
* Clears the timeout flag set by the await methods when they timed out.
* <p>History: 2.0.7 - experimental
* @return this
* @since 2.0.7 - experimental
* @since 2.1
* @see #isTimeout()
*/
@SuppressWarnings("unchecked")
@Experimental
public final U clearTimeout() {
timeout = false;
return (U)this;
}

/**
* Asserts that some awaitX method has timed out.
* <p>History: 2.0.7 - experimental
* @return this
* @since 2.0.7 - experimental
* @since 2.1
*/
@SuppressWarnings("unchecked")
@Experimental
public final U assertTimeout() {
if (!timeout) {
throw fail("No timeout?!");
Expand All @@ -963,11 +962,11 @@ public final U assertTimeout() {

/**
* Asserts that some awaitX method has not timed out.
* <p>History: 2.0.7 - experimental
* @return this
* @since 2.0.7 - experimental
* @since 2.1
*/
@SuppressWarnings("unchecked")
@Experimental
public final U assertNoTimeout() {
if (timeout) {
throw fail("Timeout?!");
Expand Down
Loading