Skip to content

test_dont_steal_unknown_functions failure #3574

@jrbourbeau

Description

@jrbourbeau

xref https://travis-ci.org/github/dask/distributed/jobs/662107052

Full traceback:
______________________ test_dont_steal_unknown_functions _______________________

    def test_func():

        result = None

        workers = []

        with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:

    

            async def coro():

                with dask.config.set(config):

                    s = False

                    for i in range(5):

                        try:

                            s, ws = await start_cluster(

                                nthreads,

                                scheduler,

                                loop,

                                security=security,

                                Worker=Worker,

                                scheduler_kwargs=scheduler_kwargs,

                                worker_kwargs=worker_kwargs,

                            )

                        except Exception as e:

                            logger.error(

                                "Failed to start gen_cluster, retrying",

                                exc_info=True,

                            )

                        else:

                            workers[:] = ws

                            args = [s] + workers

                            break

                    if s is False:

                        raise Exception("Could not start cluster")

                    if client:

                        c = await Client(

                            s.address,

                            loop=loop,

                            security=security,

                            asynchronous=True,

                            **client_kwargs

                        )

                        args = [c] + args

                    try:

                        future = func(*args)

                        if timeout:

                            future = asyncio.wait_for(future, timeout)

                        result = await future

                        if s.validate:

                            s.validate_state()

                    finally:

                        if client and c.status not in ("closing", "closed"):

                            await c._close(fast=s.status == "closed")

                        await end_cluster(s, workers)

                        await asyncio.wait_for(cleanup_global_workers(), 1)

    

                    try:

                        c = await default_client()

                    except ValueError:

                        pass

                    else:

                        await c._close(fast=True)

    

                    for i in range(5):

                        if all(c.closed() for c in Comm._instances):

                            break

                        else:

                            await asyncio.sleep(0.05)

                    else:

                        L = [c for c in Comm._instances if not c.closed()]

                        Comm._instances.clear()

                        # raise ValueError("Unclosed Comms", L)

                        print("Unclosed Comms", L)

    

                    return result

    

            result = loop.run_sync(

>               coro, timeout=timeout * 2 if timeout else timeout

            )

distributed/utils_test.py:957: 

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

../../../miniconda/envs/test-environment/lib/python3.6/site-packages/tornado/ioloop.py:576: in run_sync

    return future_cell[0].result()

distributed/utils_test.py:927: in coro

    result = await future

../../../miniconda/envs/test-environment/lib/python3.6/asyncio/tasks.py:358: in wait_for

    return fut.result()

../../../miniconda/envs/test-environment/lib/python3.6/site-packages/tornado/gen.py:1147: in run

    yielded = self.gen.send(value)

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

c = <Client: not connected>

s = <Scheduler: "tcp://127.0.0.1:43315" processes: 0 cores: 0>

a = <Worker: 'tcp://127.0.0.1:35456', 0, closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>

b = <Worker: 'tcp://127.0.0.1:42222', 1, closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2)

    def test_dont_steal_unknown_functions(c, s, a, b):

        futures = c.map(inc, [1, 2], workers=a.address, allow_other_workers=True)

        yield wait(futures)

>       assert len(a.data) == 2, [len(a.data), len(b.data)]

E       AssertionError: [1, 1]

E       assert 1 == 2

E        +  where 1 = len(Buffer<<LRU: 28/5023005081 on dict>, <Func: serialize_bytelist<->deserialize_bytes <File: /home/travis/build/dask/distributed/dask-worker-space/worker-hbrk40rs/storage, mode="a", 0 elements>>>)

E        +    where Buffer<<LRU: 28/5023005081 on dict>, <Func: serialize_bytelist<->deserialize_bytes <File: /home/travis/build/dask/distributed/dask-worker-space/worker-hbrk40rs/storage, mode="a", 0 elements>>> = <Worker: 'tcp://127.0.0.1:35456', 0, running, stored: 1, running: 0/1, ready: 0, comm: 0, waiting: 0>.data

distributed/tests/test_steal.py:116: AssertionError

Metadata

Metadata

Assignees

No one assigned

    Labels

    flaky testIntermittent failures on CI.

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions