Skip to content

NewThreadScheduler thread exhaustion #632

@akarnokd

Description

@akarnokd

The current implementation of NewThreadScheduler creates a new ThreadPoolExecutor for each non-recursive schedule invocation. The pool is never terminated (except when the application terminates), therefore, pools no longer in use still keep one thread active (blocked on waiting for the next task which can't arrive). If too many pools are created, the operating system might run out of fresh threads, causing "native thread creation failed" throughout the application. To demonstrate the problem, here is a small test case:

public class NewThreadSchedulerTest {
    @Test
    public void testExhaustion() throws InterruptedException {
        Scheduler s = Schedulers.newThread();
        BlockingObservable<Integer> source = Observable.from(1)
                .subscribeOn(s).toBlockingObservable();
        for (int i = 0; i < 10000; i++) {
            Assert.assertEquals((Integer)1, source.single());
        }
    }
}

(This test fails on my Windows 7 x86 after about 4000 iterations.)

Its equivalent Rx.NET code, however, works fine:

class Program {
        static void Main(string[] args) {
            IScheduler s = NewThreadScheduler.Default;
            IObservable<int> source = Observable.ToObservable(new int[] { 1 }, s)
                    .SubscribeOn(s);
            int sum = 0;
            for (int i = 0; i < 10000; i++) {
                sum += source.Single();
                Thread.Sleep(1);
                if (i % 256 == 0) {
                    Console.Write(i);
                    Console.Write(": ");
                    Console.WriteLine(sum);
                }
            }
        }
    }

The program's thread count remains about 15. (It seems that RxNET uses vanilla threads in combination with some kind of CurrentThreadScheduler in it and has an explicit option to quit if no more tasks are queued or pending).

I've tried to fix NewThreadScheduler by setting a keepalive on the thread pool, but unfortunately it breaks the OperationParallelMergeTest because a restarted thread in the thread pool will get a different identifier. I've tried adding wip counter to capture when to terminate the pool, but didn't work (probably due escaped scheduler reference is invoked outside the callback function/action).

Any ideas?

Metadata

Metadata

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions