Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions launch/launch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from launch.output_handler import CompositeOutputHandler
from launch.output_handler import ConsoleOutput
from launch.exit_handler import default_exit_handler


class LaunchDescriptor(object):

def __init__(self):
self.task_descriptors = []

def add_coroutine(self, coroutine, name=None, exit_handler=None):
if name is not None and name in [p.name for p in self.task_descriptors]:
raise RuntimeError("Task name '%s' already used" % name)
if exit_handler is None:
exit_handler = default_exit_handler
self.task_descriptors.append(CoroutineDescriptor(
coroutine, name, exit_handler))

def add_process(self, cmd, name=None, env=None, output_handlers=None, exit_handler=None):
if name is not None and name in [p.name for p in self.task_descriptors]:
raise RuntimeError("Task name '%s' already used" % name)
if output_handlers is None:
output_handlers = [ConsoleOutput()]
output_handlers = CompositeOutputHandler(output_handlers)
if exit_handler is None:
exit_handler = default_exit_handler
self.task_descriptors.append(ProcessDescriptor(
cmd, name, output_handlers, exit_handler, env=env))


class TaskDescriptor(object):

def __init__(self):
self.task_state = None


class CoroutineDescriptor(TaskDescriptor):

def __init__(self, coroutine, name, exit_handler):
super(CoroutineDescriptor, self).__init__()
self.coroutine = coroutine
self.name = name
self.exit_handler = exit_handler


class ProcessDescriptor(TaskDescriptor):

def __init__(self, cmd, name, output_handler, exit_handler, env=None):
super(ProcessDescriptor, self).__init__()
self.cmd = cmd
self.name = name
self.output_handler = output_handler
self.exit_handler = exit_handler
self.env = env
self.transport = None
self.protocol = None
48 changes: 48 additions & 0 deletions launch/launch/exit_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
class ExitHandlerContext(object):

"""The context which is passed to an exit handler function."""

def __init__(self, launch_state, task_state):
self.launch_state = launch_state
self.task_state = task_state


def default_exit_handler(context):
"""
Trigger teardown of launch.

Use the returncode of the task for the launch if the launch was not already tearing down.
"""
# trigger tear down if not already tearing down
if not context.launch_state.teardown:
context.launch_state.teardown = True
# set launch return code
try:
rc = int(context.task_state.returncode)
except (TypeError, ValueError):
rc = 1 if bool(context.task_state.returncode) else 0
context.launch_state.returncode = rc


def ignore_exit_handler(context):
"""Continue the launch and don't affect the returncode of the launch."""
pass


def restart_exit_handler(context):
"""Request restart of the task."""
context.task_state.restart = True


def primary_exit_handler(context):
"""
Trigger teardown of launch and if teardown already in place set non-zero return code.

Same as default exit handler but if teardown was triggered by another task
ensure that the returncode is non-zero.
"""
if context.launch_state.teardown:
if not context.launch_state.returncode:
context.launch_state.returncode = 1

default_exit_handler(context)
5 changes: 5 additions & 0 deletions launch/launch/launch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class LaunchState(object):

def __init__(self):
self.returncode = None
self.teardown = False
254 changes: 254 additions & 0 deletions launch/launch/launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
import asyncio
from collections import OrderedDict
import os
import signal
import sys
import threading

from launch.exit_handler import ExitHandlerContext
from launch.launch import LaunchState
from launch.protocol import SubprocessProtocol
from launch.task import TaskState


class DefaultLauncher(object):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The (object) isn't required in Python3 for "new" style classes. Leaving it doesn't hurt though.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently use it in 33 other places with the object and only in one location without. I would keep it for now but we can certainly remove it in all locations if everyone agrees.


def __init__(self, name_prefix='', sigint_timeout=3):
self.name_prefix = name_prefix
self.sigint_timeout = sigint_timeout
self.task_descriptors = []
self.print_mutex = threading.Lock()

def add_launch_descriptor(self, launch_descriptor):
for task_descriptor in launch_descriptor.task_descriptors:
# automatic naming if not specified
if task_descriptor.name is None:
name = str(len(self.task_descriptors))
if name in [p.name for p in self.task_descriptors]:
raise RuntimeError("Process name '%s' already used" % name)
task_descriptor.name = name

self.task_descriptors.append(task_descriptor)

def launch(self):
loop = asyncio.get_event_loop()
returncode = loop.run_until_complete(self._run())
loop.close()

return returncode

@asyncio.coroutine
def _run(self):
launch_state = LaunchState()
for p in self.task_descriptors:
p.task_state = TaskState()

# start all processes and collect their exit futures
all_futures = OrderedDict()
for index, p in enumerate(self.task_descriptors):
if 'output_handler' in dir(p):
p.output_handler.set_print_mutex(self.print_mutex)
p.output_handler.set_line_prefix('[%s] ' % p.name)

if 'protocol' in dir(p):
yield from self._spawn_process(index)
all_futures[p.protocol.exit_future] = index
else:
future = asyncio.async(p.coroutine)
all_futures[future] = index

while True:
# skip if no more processes to run
if not all_futures:
break

# wait for any process to finish
kwargs = {
'return_when': asyncio.FIRST_COMPLETED,
}
# when the event loop run does not run in the main thread
# wake up frequently and check if any subprocess has exited
if not isinstance(threading.current_thread(), threading._MainThread):
kwargs['timeout'] = 0.5
yield from asyncio.wait(all_futures.keys(), **kwargs)

# when the event loop run does not run in the main thread
# use custom logic to detect that subprocesses have exited
if not isinstance(threading.current_thread(), threading._MainThread):
for index, p in enumerate(self.task_descriptors):
# only consider not yet done tasks
if index not in all_futures.values():
continue
# only subprocesses need special handling
if 'transport' not in dir(p):
continue
# transport.get_pid() sometimes failed due to transport._proc being None
proc = p.transport.get_extra_info('subprocess')
pid = proc.pid
try:
pid, proc_rc = os.waitpid(pid, os.WNOHANG)
except ChildProcessError:
continue
if pid == 0:
# subprocess is still running
continue
p.returncode = proc_rc
# trigger syncio internal process exit callback
p.transport._process_exited(proc_rc)

# collect done futures
done_futures = [future for future in all_futures.keys() if future.done()]

# collect return code
restart_indices = []
for future in done_futures:
index = all_futures[future]
p = self.task_descriptors[index]

# collect return code / exception from coroutine
if 'coroutine' in dir(p):
exp = future.exception()
if exp:
p.task_state.exception = exp
p.task_state.returncode = 1
# print traceback with "standard" format
with self.print_mutex:
print('(%s)' % p.name, 'Traceback (most recent call last):',
file=sys.stderr)
for frame in future.get_stack():
filename = frame.f_code.co_filename
print('(%s)' % p.name, ' File "%s", line %d, in %s' %
(filename, frame.f_lineno, frame.f_code.co_name),
file=sys.stderr)
import linecache
linecache.checkcache(filename)
line = linecache.getline(filename, frame.f_lineno, frame.f_globals)
print('(%s)' % p.name, ' ' + line.strip(), file=sys.stderr)
print('(%s) %s: %s' % (p.name, type(exp).__name__, str(exp)),
file=sys.stderr)
else:
result = future.result()
p.task_state.returncode = result
self._process_message(p, 'rc ' + str(p.task_state.returncode))

# close transport
if 'protocol' in dir(p):
self._close_process(p)

# remove future
del all_futures[future]

# call exit handler of done descriptors
context = ExitHandlerContext(launch_state, p.task_state)
p.exit_handler(context)
if p.task_state.restart:
restart_indices.append(index)

if launch_state.teardown:
with self.print_mutex:
print('() tear down')
break

# restart processes if requested
for index in restart_indices:
p = self.task_descriptors[index]
if 'protocol' in dir(p):
p.task_states[index].restart_count += 1
yield from self._spawn_process(index)
all_futures[p.protocol.exit_future] = index

# terminate all remaining processes
if all_futures:

# sending SIGINT to remaining processes
for index in all_futures.values():
p = self.task_descriptors[index]
if 'transport' in dir(p):
self._process_message(p, 'signal SIGINT')
try:
p.transport.send_signal(signal.SIGINT)
except ProcessLookupError:
pass

yield from asyncio.wait(all_futures.keys(), timeout=self.sigint_timeout)

# sending SIGINT to remaining processes
for index in all_futures.values():
p = self.task_descriptors[index]
if 'protocol' in dir(p):
if not p.protocol.exit_future.done():
self._process_message(p, 'signal SIGTERM')
try:
p.transport.send_signal(signal.SIGTERM)
except ProcessLookupError:
pass

yield from asyncio.wait(all_futures.keys())

# close all remaining processes
for index in all_futures.values():
p = self.task_descriptors[index]
if 'transport' in dir(p):
self._close_process(p)

# call exit handler of remaining descriptors
for index in all_futures.values():
p = self.task_descriptors[index]
context = ExitHandlerContext(launch_state, p.task_state)
p.exit_handler(context)

if launch_state.returncode is None:
launch_state.returncode = 0
return launch_state.returncode

def _spawn_process(self, index):
p = self.task_descriptors[index]
p.output_handler.process_init()
kwargs = {}
if p.output_handler.support_stderr2stdout():
kwargs['stderr'] = asyncio.subprocess.STDOUT
loop = asyncio.get_event_loop()
transport, protocol = yield from loop.subprocess_exec(
lambda: SubprocessProtocol(p.output_handler),
*p.cmd,
**kwargs)
p.transport = transport
p.protocol = protocol

output_handler_description = p.output_handler.get_description()
if 'stderr' in kwargs:
output_handler_description = 'stderr > stdout, ' + output_handler_description

self._process_message(
p, 'pid %d: %s (%s)' % (transport.get_pid(), p.cmd, output_handler_description))

def _close_process(self, process_descriptor):
p = process_descriptor
p.transport.close()
p.task_state.returncode = p.transport.get_returncode()
self._process_message(p, 'rc %d' % p.task_state.returncode)
p.output_handler.process_cleanup()

def _process_message(self, process_descriptor, message):
p = process_descriptor

with self.print_mutex:
print('(%s)' % p.name, message)
lines = (message + '\n').encode()
if 'output_handler' in dir(p):
p.output_handler.on_message_received(lines)


class AsynchronousLauncher(threading.Thread):

def __init__(self, launcher):
super(AsynchronousLauncher, self).__init__()
self.launcher = launcher

def run(self):
# explicitly create event loop when not running in main thread
if not isinstance(threading.current_thread(), threading._MainThread):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

self.launcher.launch()
7 changes: 7 additions & 0 deletions launch/launch/loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from importlib.machinery import SourceFileLoader


def load_launch_file(launch_file, launch_descriptor, argv):
loader = SourceFileLoader('launch_file', launch_file)
launch_module = loader.load_module()
launch_module.launch(launch_descriptor, argv)
Loading