Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions capabilities2_msgs/msg/Capability.msg
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Capability
string capability

# Used provider
string provider

# trigger parameters
string parameters
1 change: 1 addition & 0 deletions capabilities2_msgs/srv/ConfigureCapability.srv
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ Capability target_on_stop
Capability target_on_success
Capability target_on_failure
string connection_description
int32 trigger_id
---
2 changes: 2 additions & 0 deletions capabilities2_runner/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ find_package(rclcpp REQUIRED)
find_package(rclcpp_action REQUIRED)
find_package(pluginlib REQUIRED)
find_package(capabilities2_msgs REQUIRED)
find_package(capabilities2_utils REQUIRED)
find_package(event_logger REQUIRED)
find_package(event_logger_msgs REQUIRED)
find_package(tinyxml2_vendor REQUIRED)
Expand All @@ -33,6 +34,7 @@ ament_target_dependencies(${PROJECT_NAME}
rclcpp_action
pluginlib
capabilities2_msgs
capabilities2_utils
event_logger
event_logger_msgs
TinyXML2
Expand Down
37 changes: 14 additions & 23 deletions capabilities2_runner/include/capabilities2_runner/action_runner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,11 @@ class ActionRunner : public RunnerBase
}

// Trigger on_stopped event if defined
if (events[execute_id].on_stopped.interface != "")
if (events[runner_id].on_stopped.interface != "")
{
event_(EventType::STOPPED, -1, events[execute_id].on_stopped.interface,
events[execute_id].on_stopped.provider);
triggerFunction_(events[execute_id].on_stopped.interface,
update_on_stopped(events[execute_id].on_stopped.parameters));
event_(EventType::STOPPED, -1, events[runner_id].on_stopped.interface, events[runner_id].on_stopped.provider);
triggerFunction_(events[runner_id].on_stopped.interface,
update_on_stopped(events[runner_id].on_stopped.parameters));
}
});

Expand Down Expand Up @@ -129,16 +128,14 @@ class ActionRunner : public RunnerBase
*/
virtual void execution(int id) override
{
execute_id += 1;

// if parameters are not provided then cannot proceed
if (!parameters_[id])
throw runner_exception("cannot trigger action without parameters");

// generate a goal from parameters if provided
goal_msg_ = generate_goal(parameters_[id], id);

info_("goal generated", id);
info_("goal generated for event ", id);

std::unique_lock<std::mutex> lock(mutex_);
completed_ = false;
Expand All @@ -151,12 +148,10 @@ class ActionRunner : public RunnerBase
info_("goal accepted. Waiting for result", id);

// trigger the events related to on_started state
if (events[execute_id].on_started.interface != "")
if (events[id].on_started.interface != "")
{
event_(EventType::STARTED, id, events[execute_id].on_started.interface,
events[execute_id].on_started.provider);
triggerFunction_(events[execute_id].on_started.interface,
update_on_started(events[execute_id].on_started.parameters));
event_(EventType::STARTED, id, events[id].on_started.interface, events[id].on_started.provider);
triggerFunction_(events[id].on_started.interface, update_on_started(events[id].on_started.parameters));
}
}
else
Expand Down Expand Up @@ -187,25 +182,21 @@ class ActionRunner : public RunnerBase
info_("action succeeded.", id);

// trigger the events related to on_success state
if (events[execute_id].on_success.interface != "")
if (events[id].on_success.interface != "")
{
event_(EventType::SUCCEEDED, id, events[execute_id].on_success.interface,
events[execute_id].on_success.provider);
triggerFunction_(events[execute_id].on_success.interface,
update_on_success(events[execute_id].on_success.parameters));
event_(EventType::SUCCEEDED, id, events[id].on_success.interface, events[id].on_success.provider);
triggerFunction_(events[id].on_success.interface, update_on_success(events[id].on_success.parameters));
}
}
else
{
error_("action failed", id);

// trigger the events related to on_failure state
if (events[execute_id].on_failure.interface != "")
if (events[id].on_failure.interface != "")
{
event_(EventType::FAILED, id, events[execute_id].on_failure.interface,
events[execute_id].on_failure.provider);
triggerFunction_(events[execute_id].on_failure.interface,
update_on_failure(events[execute_id].on_failure.parameters));
event_(EventType::FAILED, id, events[id].on_failure.interface, events[id].on_failure.provider);
triggerFunction_(events[id].on_failure.interface, update_on_failure(events[id].on_failure.parameters));
}
}

Expand Down
64 changes: 39 additions & 25 deletions capabilities2_runner/include/capabilities2_runner/runner_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
#include <thread>
#include <tinyxml2.h>
#include <rclcpp/rclcpp.hpp>

#include <capabilities2_utils/event_types.hpp>

#include <event_logger_msgs/msg/event.hpp>
#include <event_logger_msgs/msg/event_capability.hpp>
#include <event_logger/event_types.hpp>
#include <event_logger/event_client.hpp>

namespace capabilities2_runner
Expand Down Expand Up @@ -61,13 +63,14 @@ struct runner_opts
std::string runner;
std::string started_by;
std::string pid;
int input_count;
};

class RunnerBase
{
public:
using Event = event_logger_msgs::msg::Event;
using EventType = event_logger::event_t;
using EventType = capabilities2::event_t;

RunnerBase() : run_config_()
{
Expand Down Expand Up @@ -105,15 +108,18 @@ class RunnerBase
*/
virtual void trigger(const std::string& parameters)
{
info_("received new parameters", thread_id);
// extract the unique id for the runner and use that as the thread id
tinyxml2::XMLElement * element = nullptr;
element = convert_to_xml(parameters);
element->QueryIntAttribute("id", &runner_id);

parameters_[thread_id] = convert_to_xml(parameters);
parameters_[runner_id] = element;

executionThreadPool[thread_id] = std::thread(&RunnerBase::execution, this, thread_id);
info_("received new parameters with event id", runner_id);

info_("started execution", thread_id);
executionThreadPool[runner_id] = std::thread(&RunnerBase::execution, this, runner_id);

thread_id += 1;
info_("started execution", runner_id);
}

/**
Expand All @@ -128,9 +134,8 @@ class RunnerBase
node_ = node;
run_config_ = run_config;

insert_id = 0;
execute_id = -1;
thread_id = 0;
current_inputs_ = 0;
runner_id = 0;

event_client_ = std::make_shared<EventClient>(node_, "runner", "/events");
}
Expand All @@ -139,20 +144,18 @@ class RunnerBase
* @brief attach events to the runner
*
* @param event_option event_options related for the action
* @param triggerFunction external function that triggers capability runners
*
* @return number of attached events
*/
int attach_events(event_logger::event_opts& event_option,
virtual void attach_events(capabilities2::event_opts& event_option,
std::function<void(const std::string&, const std::string&)> triggerFunction)
{
info_("accepted event options with ID : " + std::to_string(insert_id));
info_("accepted event options with ID : " + std::to_string(event_option.event_id));

triggerFunction_ = triggerFunction;

events[insert_id] = event_option;
insert_id += 1;

return insert_id;
events[event_option.event_id] = event_option;
}

/**
Expand Down Expand Up @@ -195,13 +198,24 @@ class RunnerBase
return run_config_.pid;
}

/**
* @brief Get the execution status of runner.
*
* @return `true` if execution is complete, `false` otherwise.
*/
const bool get_completion_status() const
{
return execution_complete_;
}

protected:
/**
* @brief Trigger process to be executed.
*
* This method utilizes paramters set via the trigger() function
*
* @param parameters pointer to tinyxml2::XMLElement that contains parameters
*
* @param id unique identifier for the runner id. used to track the correct
* triggers and subsequent events.
*
*/
virtual void execution(int id) = 0;
Expand Down Expand Up @@ -558,22 +572,22 @@ class RunnerBase
/**
* @brief dictionary of events
*/
std::map<int, event_logger::event_opts> events;
std::map<int, capabilities2::event_opts> events;

/**
* @brief Last event tracker id to be inserted
* @brief unique id for the runner
*/
int insert_id;
int runner_id;

/**
* @brief Last parameter tracker id to be executed
* @brief curent number of trigger signals received
*/
int execute_id;
int current_inputs_;

/**
* @brief Last parameter tracker id to be executed
* @brief system runner completion tracking
*/
int thread_id;
bool execution_complete_;

/**
* @brief pointer to XMLElement which contain parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,14 @@ class ServiceRunner : public RunnerBase
*/
virtual void execution(int id) override
{
execute_id += 1;

// if parameters are not provided then cannot proceed
if (!parameters_[id])
throw runner_exception("cannot trigger service without parameters");

// generate a goal from parameters if provided
auto request_msg = std::make_shared<typename ServiceT::Request>(generate_request(parameters_[id], id));

info_("request generated", id);
info_("request generated for event :", id);

std::unique_lock<std::mutex> lock(mutex_);
completed_ = false;
Expand All @@ -82,12 +80,10 @@ class ServiceRunner : public RunnerBase
error_("get result call failed");

// trigger the events related to on_failure state
if (events[execute_id].on_failure.interface != "")
if (events[id].on_failure.interface != "")
{
event_(EventType::FAILED, id, events[execute_id].on_failure.interface,
events[execute_id].on_failure.provider);
triggerFunction_(events[execute_id].on_failure.interface,
update_on_failure(events[execute_id].on_failure.parameters));
event_(EventType::FAILED, id, events[id].on_failure.interface, events[id].on_failure.provider);
triggerFunction_(events[id].on_failure.interface, update_on_failure(events[id].on_failure.parameters));
}
}
else
Expand All @@ -98,12 +94,10 @@ class ServiceRunner : public RunnerBase
process_response(response_, id);

// trigger the events related to on_success state
if (events[execute_id].on_success.interface != "")
if (events[id].on_success.interface != "")
{
event_(EventType::SUCCEEDED, id, events[execute_id].on_success.interface,
events[execute_id].on_success.provider);
triggerFunction_(events[execute_id].on_success.interface,
update_on_success(events[execute_id].on_success.parameters));
event_(EventType::SUCCEEDED, id, events[id].on_success.interface, events[id].on_success.provider);
triggerFunction_(events[id].on_success.interface, update_on_success(events[id].on_success.parameters));
}
}

Expand All @@ -112,11 +106,10 @@ class ServiceRunner : public RunnerBase
});

// trigger the events related to on_started state
if (events[execute_id].on_started.interface != "")
if (events[id].on_started.interface != "")
{
event_(EventType::STARTED, id, events[execute_id].on_started.interface, events[execute_id].on_started.provider);
triggerFunction_(events[execute_id].on_started.interface,
update_on_started(events[execute_id].on_started.parameters));
event_(EventType::STARTED, id, events[id].on_started.interface, events[id].on_started.provider);
triggerFunction_(events[id].on_started.interface, update_on_started(events[id].on_started.parameters));
}

// Conditional wait
Expand All @@ -143,11 +136,11 @@ class ServiceRunner : public RunnerBase
throw runner_exception("cannot stop runner action that was not started");

// Trigger on_stopped event if defined
if (events[execute_id].on_stopped.interface != "")
if (events[runner_id].on_stopped.interface != "")
{
event_(EventType::STOPPED, -1, events[execute_id].on_stopped.interface, events[execute_id].on_stopped.provider);
triggerFunction_(events[execute_id].on_stopped.interface,
update_on_stopped(events[execute_id].on_stopped.parameters));
event_(EventType::STOPPED, -1, events[runner_id].on_stopped.interface, events[runner_id].on_stopped.provider);
triggerFunction_(events[runner_id].on_stopped.interface,
update_on_stopped(events[runner_id].on_stopped.parameters));
}

info_("stopping runner");
Expand Down
Loading