Skip to content

GroupBy with flatMap and observeOn does not seem to support backpressure in 1.2.x #5029

@sbarlabanov

Description

@sbarlabanov

There are a plenty of issues fixing some sorts of backpressure problems in groupBy (e.g. #3428, #3425).
But nevertheless when I try the following code it requests all 10000 values from the source at once without waiting for processing of single items (example running with rxjava 1.2.x):

import rx.Observable;
import rx.schedulers.Schedulers;

import java.util.Random;

import static rx.Observable.range;

public class GroupByTest {
    public static void main(String[] args) throws Exception {
        Observable<Integer> source = range(1, 10000);
        source
                .doOnNext(i -> System.out.println("Requested " + i))
                .groupBy(v -> v % 5)
                .flatMap(g -> g.observeOn(Schedulers.io()).map(GroupByTest::calculation))
                .subscribe(i -> System.out.println("Got " + i));
        Thread.sleep(100000);
    }

    private static Integer calculation(Integer i) {
        sleep();
        System.out.println("Processing " + i);
        return i * 20;
    }

    private static void sleep() {
        try {
            Thread.sleep(new Random().nextInt(1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

When running the code I see 10000 "Requested X" messages getting printed immediately and afterwards messages "Processing X" coming after some timeout.
With this behavior it does not seem to be possible to parallelize processing of single groups in rxjava in any useful way when working with large or unlimited sources.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions