From ad29b276413e0d150fa1aad5373f489cf6ce5369 Mon Sep 17 00:00:00 2001 From: Kalana Ratnayake Date: Tue, 12 Aug 2025 20:28:11 +1000 Subject: [PATCH 1/5] added new multiplexer runners --- .../capabilities2_runner/runner_base.hpp | 1 + .../input_multiplex_all_runner.hpp | 108 ++++++++++++++++++ .../input_multiplex_any_runner.hpp | 108 ++++++++++++++++++ capabilities2_runner_capabilities/plugins.xml | 12 ++ .../src/capabilities2_runner.cpp | 6 +- .../capabilities2_utils/connection.hpp | 5 +- 6 files changed, 237 insertions(+), 3 deletions(-) create mode 100644 capabilities2_runner_capabilities/include/capabilities2_runner_capabilities/input_multiplex_all_runner.hpp create mode 100644 capabilities2_runner_capabilities/include/capabilities2_runner_capabilities/input_multiplex_any_runner.hpp diff --git a/capabilities2_runner/include/capabilities2_runner/runner_base.hpp b/capabilities2_runner/include/capabilities2_runner/runner_base.hpp index 963d88a..3fe8e70 100644 --- a/capabilities2_runner/include/capabilities2_runner/runner_base.hpp +++ b/capabilities2_runner/include/capabilities2_runner/runner_base.hpp @@ -61,6 +61,7 @@ struct runner_opts std::string runner; std::string started_by; std::string pid; + int input_count; }; class RunnerBase diff --git a/capabilities2_runner_capabilities/include/capabilities2_runner_capabilities/input_multiplex_all_runner.hpp b/capabilities2_runner_capabilities/include/capabilities2_runner_capabilities/input_multiplex_all_runner.hpp new file mode 100644 index 0000000..322d8a3 --- /dev/null +++ b/capabilities2_runner_capabilities/include/capabilities2_runner_capabilities/input_multiplex_all_runner.hpp @@ -0,0 +1,108 @@ +#pragma once + +#include +#include + +namespace capabilities2_runner +{ +class InputMultiplexAllRunner : public RunnerBase +{ +public: + /** + * @brief Constructor which needs to be empty due to plugin semantics + */ + InputMultiplexAllRunner() : RunnerBase() + { + } + + /** + * @brief Starter function for starting the action runner + * + * @param node shared pointer to the capabilities node. Allows to use ros node related functionalities + * @param run_config runner configuration loaded from the yaml file + */ + virtual void start(rclcpp::Node::SharedPtr node, const runner_opts& run_config) override + { + init_base(node, run_config); + + info_("Starting InputMultiplexAllRunner with " + std::to_string(run_config.input_count) + " inputs."); + + expected_inputs_ = run_config.input_count; + + current_inputs_ = 0; + not_triggered_ = true; + } + + virtual void trigger(const std::string& parameters) override + { + current_inputs_ += 1; + + if (not_triggered_) + not_triggered_ = false; + + if (current_inputs_ == expected_inputs_) + { + info_("InputMultiplexAllRunner is has fullfilled the All condition with " + std::to_string(current_inputs_) + + " inputs."); + + executionThread = std::thread(&InputMultiplexAllRunner::execution, this, thread_id); + thread_id += 1; + } + else + { + info_("InputMultiplexAllRunner waiting. Only got " + std::to_string(current_inputs_ + 1) + "/" + + std::to_string(expected_inputs_) + " inputs."); + } + } + + virtual void execution(int id) + { + // trigger the events related to on_success state + if (events[execute_id].on_success.interface != "") + { + event_(EventType::SUCCEEDED, id, events[execute_id].on_success.interface, events[execute_id].on_success.provider); + triggerFunction_(events[execute_id].on_success.interface, + update_on_success(events[execute_id].on_success.parameters)); + } + // trigger the events related to on_failure state + else if (events[execute_id].on_failure.interface != "") + { + event_(EventType::FAILED, id, events[execute_id].on_failure.interface, events[execute_id].on_failure.provider); + triggerFunction_(events[execute_id].on_failure.interface, + update_on_failure(events[execute_id].on_failure.parameters)); + } + } + + /** + * @brief stop function to cease functionality and shutdown + * + */ + virtual void stop() override + { + // if the node pointer is empty then throw an error + // this means that the runner was not started and is being used out of order + + if (!node_) + throw runner_exception("cannot stop runner that was not started"); + + info_("stopping runner"); + } + + /** + * @brief Destructor + * + * Cleans up the thread if it is still running + */ + ~InputMultiplexAllRunner(); + +private: + bool not_triggered_; + + int expected_inputs_; + + int current_inputs_; + + std::thread executionThread; +}; + +} // namespace capabilities2_runner diff --git a/capabilities2_runner_capabilities/include/capabilities2_runner_capabilities/input_multiplex_any_runner.hpp b/capabilities2_runner_capabilities/include/capabilities2_runner_capabilities/input_multiplex_any_runner.hpp new file mode 100644 index 0000000..2adc201 --- /dev/null +++ b/capabilities2_runner_capabilities/include/capabilities2_runner_capabilities/input_multiplex_any_runner.hpp @@ -0,0 +1,108 @@ +#pragma once + +#include +#include + +namespace capabilities2_runner +{ +class InputMultiplexAnyRunner : public RunnerBase +{ +public: + /** + * @brief Constructor which needs to be empty due to plugin semantics + */ + InputMultiplexAnyRunner() : RunnerBase() + { + } + + /** + * @brief Starter function for starting the action runner + * + * @param node shared pointer to the capabilities node. Allows to use ros node related functionalities + * @param run_config runner configuration loaded from the yaml file + */ + virtual void start(rclcpp::Node::SharedPtr node, const runner_opts& run_config) override + { + init_base(node, run_config); + + info_("Starting InputMultiplexAnyRunner with " + std::to_string(run_config.input_count) + " inputs."); + + expected_inputs_ = run_config.input_count; + + current_inputs_ = 0; + not_triggered_ = true; + } + + virtual void trigger(const std::string& parameters) override + { + current_inputs_ += 1; + + if (not_triggered_) + not_triggered_ = false; + + if (current_inputs_ >= 0) + { + info_("InputMultiplexAnyRunner is has fullfilled the ANY condition with " + std::to_string(current_inputs_) + + " inputs."); + + executionThread = std::thread(&InputMultiplexAnyRunner::execution, this, thread_id); + thread_id += 1; + } + else + { + info_("InputMultiplexAnyRunner waiting. Only got " + std::to_string(current_inputs_ + 1) + "/" + + std::to_string(expected_inputs_) + " inputs."); + } + } + + virtual void execution(int id) + { + // trigger the events related to on_success state + if (events[execute_id].on_success.interface != "") + { + event_(EventType::SUCCEEDED, id, events[execute_id].on_success.interface, events[execute_id].on_success.provider); + triggerFunction_(events[execute_id].on_success.interface, + update_on_success(events[execute_id].on_success.parameters)); + } + // trigger the events related to on_failure state + else if (events[execute_id].on_failure.interface != "") + { + event_(EventType::FAILED, id, events[execute_id].on_failure.interface, events[execute_id].on_failure.provider); + triggerFunction_(events[execute_id].on_failure.interface, + update_on_failure(events[execute_id].on_failure.parameters)); + } + } + + /** + * @brief stop function to cease functionality and shutdown + * + */ + virtual void stop() override + { + // if the node pointer is empty then throw an error + // this means that the runner was not started and is being used out of order + + if (!node_) + throw runner_exception("cannot stop runner that was not started"); + + info_("stopping runner"); + } + + /** + * @brief Destructor + * + * Cleans up the thread if it is still running + */ + ~InputMultiplexAnyRunner(); + +private: + bool not_triggered_; + + int expected_inputs_; + + int current_inputs_; + + std::thread executionThread; +}; + +} // namespace capabilities2_runner diff --git a/capabilities2_runner_capabilities/plugins.xml b/capabilities2_runner_capabilities/plugins.xml index f4ff0f8..df235ea 100644 --- a/capabilities2_runner_capabilities/plugins.xml +++ b/capabilities2_runner_capabilities/plugins.xml @@ -4,4 +4,16 @@ A plugin that request capabilities from the capabilities server and transfers to an LLM + + + A plugin that Acts as a multiplexer for multiple input streams, allowing them to be processed in a single runner. + It can handle multiple input streams and route them to the appropriate output. + + + + + A plugin that Acts as a multiplexer for multiple input streams, allowing them to be processed in a single runner. + It can handle multiple input streams and route them to the appropriate output. + + diff --git a/capabilities2_runner_capabilities/src/capabilities2_runner.cpp b/capabilities2_runner_capabilities/src/capabilities2_runner.cpp index c6c35a5..a07e42f 100644 --- a/capabilities2_runner_capabilities/src/capabilities2_runner.cpp +++ b/capabilities2_runner_capabilities/src/capabilities2_runner.cpp @@ -1,6 +1,10 @@ #include #include #include +#include +#include // register runner plugins -PLUGINLIB_EXPORT_CLASS(capabilities2_runner::CapabilityGetRunner, capabilities2_runner::RunnerBase) +PLUGINLIB_EXPORT_CLASS(capabilities2_runner::CapabilityGetRunner, capabilities2_runner::RunnerBase); +PLUGINLIB_EXPORT_CLASS(capabilities2_runner::InputMultiplexAllRunner, capabilities2_runner::RunnerBase); +PLUGINLIB_EXPORT_CLASS(capabilities2_runner::InputMultiplexAnyRunner, capabilities2_runner::RunnerBase); diff --git a/capabilities2_utils/include/capabilities2_utils/connection.hpp b/capabilities2_utils/include/capabilities2_utils/connection.hpp index 0d68d8e..d6b68dc 100644 --- a/capabilities2_utils/include/capabilities2_utils/connection.hpp +++ b/capabilities2_utils/include/capabilities2_utils/connection.hpp @@ -14,8 +14,9 @@ namespace capabilities2 struct connection_t { - std::string runner; - std::string provider; + std::string runner = ""; + std::string provider = ""; + int input_count = 0; tinyxml2::XMLElement* parameters = nullptr; }; From 23cbb27dc3c99ee87e4548a5140d2a4ecde8fc20 Mon Sep 17 00:00:00 2001 From: Kalana Ratnayake Date: Wed, 13 Aug 2025 17:35:10 +1000 Subject: [PATCH 2/5] updated capabilities2 server backend and refactored runners --- capabilities2_msgs/msg/Capability.msg | 2 + capabilities2_msgs/srv/UseCapability.srv | 1 + .../capabilities2_runner/runner_base.hpp | 21 ++++++ capabilities2_runner_capabilities/plugins.xml | 12 ---- .../src/capabilities2_runner.cpp | 4 -- capabilities2_runner_fabric/plugins.xml | 5 -- .../src/fabric_runners.cpp | 2 - capabilities2_runner_system/.clang-format | 64 +++++++++++++++++++ capabilities2_runner_system/CMakeLists.txt | 61 ++++++++++++++++++ .../completion_runner.hpp | 6 +- .../input_multiplex_all_runner.hpp | 37 +++++------ .../input_multiplex_any_runner.hpp | 38 ++++++----- capabilities2_runner_system/package.xml | 35 ++++++++++ capabilities2_runner_system/plugins.xml | 19 ++++++ .../src/capabilities2_runner.cpp | 10 +++ .../capabilities2_server/capabilities_api.hpp | 15 +++-- .../capabilities_server.hpp | 8 ++- .../capabilities2_server/runner_cache.hpp | 56 +++++++++++++--- system_capabilities/CMakeLists.txt | 16 +++++ system_capabilities/LICENSE | 17 +++++ .../interfaces/CompletionRunner.yaml | 13 ++++ .../interfaces/InputMultiplexAllRunner.yaml | 8 +++ .../interfaces/InputMultiplexAnyRunner.yaml | 8 +++ system_capabilities/package.xml | 48 ++++++++++++++ .../providers/CompletionRunner.yaml | 8 +++ .../providers/InputMultiplexAllRunner.yaml | 8 +++ .../providers/InputMultiplexAnyRunner.yaml | 8 +++ 27 files changed, 449 insertions(+), 81 deletions(-) create mode 100644 capabilities2_runner_system/.clang-format create mode 100644 capabilities2_runner_system/CMakeLists.txt rename {capabilities2_runner_fabric/include/capabilities2_runner_fabric => capabilities2_runner_system/include/capabilities2_runner_system}/completion_runner.hpp (89%) rename {capabilities2_runner_capabilities/include/capabilities2_runner_capabilities => capabilities2_runner_system/include/capabilities2_runner_system}/input_multiplex_all_runner.hpp (77%) rename {capabilities2_runner_capabilities/include/capabilities2_runner_capabilities => capabilities2_runner_system/include/capabilities2_runner_system}/input_multiplex_any_runner.hpp (77%) create mode 100644 capabilities2_runner_system/package.xml create mode 100644 capabilities2_runner_system/plugins.xml create mode 100644 capabilities2_runner_system/src/capabilities2_runner.cpp create mode 100644 system_capabilities/CMakeLists.txt create mode 100644 system_capabilities/LICENSE create mode 100644 system_capabilities/interfaces/CompletionRunner.yaml create mode 100644 system_capabilities/interfaces/InputMultiplexAllRunner.yaml create mode 100644 system_capabilities/interfaces/InputMultiplexAnyRunner.yaml create mode 100644 system_capabilities/package.xml create mode 100644 system_capabilities/providers/CompletionRunner.yaml create mode 100644 system_capabilities/providers/InputMultiplexAllRunner.yaml create mode 100644 system_capabilities/providers/InputMultiplexAnyRunner.yaml diff --git a/capabilities2_msgs/msg/Capability.msg b/capabilities2_msgs/msg/Capability.msg index 73384f0..5bf97a1 100644 --- a/capabilities2_msgs/msg/Capability.msg +++ b/capabilities2_msgs/msg/Capability.msg @@ -1,6 +1,8 @@ # Capability string capability + # Used provider string provider + # trigger parameters string parameters diff --git a/capabilities2_msgs/srv/UseCapability.srv b/capabilities2_msgs/srv/UseCapability.srv index fd6b98e..148ab0d 100644 --- a/capabilities2_msgs/srv/UseCapability.srv +++ b/capabilities2_msgs/srv/UseCapability.srv @@ -1,4 +1,5 @@ string capability string preferred_provider +int32 input_count string bond_id --- diff --git a/capabilities2_runner/include/capabilities2_runner/runner_base.hpp b/capabilities2_runner/include/capabilities2_runner/runner_base.hpp index 3fe8e70..6eb299c 100644 --- a/capabilities2_runner/include/capabilities2_runner/runner_base.hpp +++ b/capabilities2_runner/include/capabilities2_runner/runner_base.hpp @@ -132,6 +132,7 @@ class RunnerBase insert_id = 0; execute_id = -1; thread_id = 0; + current_inputs_ = 0; event_client_ = std::make_shared(node_, "runner", "/events"); } @@ -196,6 +197,16 @@ class RunnerBase return run_config_.pid; } + /** + * @brief Get the execution status of runner. + * + * @return `true` if execution is complete, `false` otherwise. + */ + const bool get_completion_status() const + { + return execution_complete_; + } + protected: /** * @brief Trigger process to be executed. @@ -576,6 +587,16 @@ class RunnerBase */ int thread_id; + /** + * @brief curent number of trigger signals received + */ + int current_inputs_; + + /** + * @brief system runner completion tracking + */ + bool execution_complete_; + /** * @brief pointer to XMLElement which contain parameters */ diff --git a/capabilities2_runner_capabilities/plugins.xml b/capabilities2_runner_capabilities/plugins.xml index df235ea..f4ff0f8 100644 --- a/capabilities2_runner_capabilities/plugins.xml +++ b/capabilities2_runner_capabilities/plugins.xml @@ -4,16 +4,4 @@ A plugin that request capabilities from the capabilities server and transfers to an LLM - - - A plugin that Acts as a multiplexer for multiple input streams, allowing them to be processed in a single runner. - It can handle multiple input streams and route them to the appropriate output. - - - - - A plugin that Acts as a multiplexer for multiple input streams, allowing them to be processed in a single runner. - It can handle multiple input streams and route them to the appropriate output. - - diff --git a/capabilities2_runner_capabilities/src/capabilities2_runner.cpp b/capabilities2_runner_capabilities/src/capabilities2_runner.cpp index a07e42f..bcdf964 100644 --- a/capabilities2_runner_capabilities/src/capabilities2_runner.cpp +++ b/capabilities2_runner_capabilities/src/capabilities2_runner.cpp @@ -1,10 +1,6 @@ #include #include #include -#include -#include // register runner plugins PLUGINLIB_EXPORT_CLASS(capabilities2_runner::CapabilityGetRunner, capabilities2_runner::RunnerBase); -PLUGINLIB_EXPORT_CLASS(capabilities2_runner::InputMultiplexAllRunner, capabilities2_runner::RunnerBase); -PLUGINLIB_EXPORT_CLASS(capabilities2_runner::InputMultiplexAnyRunner, capabilities2_runner::RunnerBase); diff --git a/capabilities2_runner_fabric/plugins.xml b/capabilities2_runner_fabric/plugins.xml index 5168f4b..07e8a06 100644 --- a/capabilities2_runner_fabric/plugins.xml +++ b/capabilities2_runner_fabric/plugins.xml @@ -1,9 +1,4 @@ - - - A plugin that notifies about the completion of the fabric to the action server - - A plugin that sets a new Fabric plan to the Fabric diff --git a/capabilities2_runner_fabric/src/fabric_runners.cpp b/capabilities2_runner_fabric/src/fabric_runners.cpp index 6167db2..703c1e1 100644 --- a/capabilities2_runner_fabric/src/fabric_runners.cpp +++ b/capabilities2_runner_fabric/src/fabric_runners.cpp @@ -1,8 +1,6 @@ #include #include -#include #include // register runner plugins -PLUGINLIB_EXPORT_CLASS(capabilities2_runner::FabricCompletionRunner, capabilities2_runner::RunnerBase) PLUGINLIB_EXPORT_CLASS(capabilities2_runner::FabricSetPlanRunner, capabilities2_runner::RunnerBase) diff --git a/capabilities2_runner_system/.clang-format b/capabilities2_runner_system/.clang-format new file mode 100644 index 0000000..d36804f --- /dev/null +++ b/capabilities2_runner_system/.clang-format @@ -0,0 +1,64 @@ + +BasedOnStyle: Google +AccessModifierOffset: -2 +ConstructorInitializerIndentWidth: 2 +AlignEscapedNewlinesLeft: false +AlignTrailingComments: true +AllowAllParametersOfDeclarationOnNextLine: false +AllowShortIfStatementsOnASingleLine: false +AllowShortLoopsOnASingleLine: false +AllowShortFunctionsOnASingleLine: None +AlwaysBreakTemplateDeclarations: true +AlwaysBreakBeforeMultilineStrings: false +BreakBeforeBinaryOperators: false +BreakBeforeTernaryOperators: false +BreakConstructorInitializersBeforeComma: true +BinPackParameters: true +ColumnLimit: 120 +ConstructorInitializerAllOnOneLineOrOnePerLine: true +DerivePointerBinding: false +PointerBindsToType: true +ExperimentalAutoDetectBinPacking: false +IndentCaseLabels: true +MaxEmptyLinesToKeep: 1 +NamespaceIndentation: None +ObjCSpaceBeforeProtocolList: true +PenaltyBreakBeforeFirstCallParameter: 19 +PenaltyBreakComment: 60 +PenaltyBreakString: 1 +PenaltyBreakFirstLessLess: 1000 +PenaltyExcessCharacter: 1000 +PenaltyReturnTypeOnItsOwnLine: 90 +SpacesBeforeTrailingComments: 2 +Cpp11BracedListStyle: false +Standard: Auto +IndentWidth: 2 +TabWidth: 2 +UseTab: Never +IndentFunctionDeclarationAfterType: false +SpacesInParentheses: false +SpacesInAngles: false +SpaceInEmptyParentheses: false +SpacesInCStyleCastParentheses: false +SpaceAfterControlStatementKeyword: true +SpaceBeforeAssignmentOperators: true +ContinuationIndentWidth: 4 +SortIncludes: false +SpaceAfterCStyleCast: false + +# Configure each individual brace in BraceWrapping +BreakBeforeBraces: Custom + +# Control of individual brace wrapping cases +BraceWrapping: { + AfterClass: 'true' + AfterControlStatement: 'true' + AfterEnum : 'true' + AfterFunction : 'true' + AfterNamespace : 'true' + AfterStruct : 'true' + AfterUnion : 'true' + BeforeCatch : 'true' + BeforeElse : 'true' + IndentBraces : 'false' +} diff --git a/capabilities2_runner_system/CMakeLists.txt b/capabilities2_runner_system/CMakeLists.txt new file mode 100644 index 0000000..6f4d9c3 --- /dev/null +++ b/capabilities2_runner_system/CMakeLists.txt @@ -0,0 +1,61 @@ +cmake_minimum_required(VERSION 3.8) +project(capabilities2_runner_system) + +# Default to C++17 +if(NOT CMAKE_CXX_STANDARD) + set(CMAKE_CXX_STANDARD 17) +endif() + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +find_package(ament_cmake REQUIRED) +find_package(rclcpp REQUIRED) +find_package(rclcpp_action REQUIRED) +find_package(pluginlib REQUIRED) +find_package(capabilities2_msgs REQUIRED) +find_package(capabilities2_runner REQUIRED) +find_package(fabric_msgs REQUIRED) +find_package(event_logger REQUIRED) +find_package(event_logger_msgs REQUIRED) +find_package(tinyxml2_vendor REQUIRED) +find_package(TinyXML2 REQUIRED) # provided by tinyxml2 upstream, or tinyxml2_vendor + +include_directories( + include +) + +add_library(${PROJECT_NAME} SHARED + src/capabilities2_runner.cpp +) + +ament_target_dependencies(${PROJECT_NAME} + rclcpp + rclcpp_action + pluginlib + capabilities2_msgs + fabric_msgs + capabilities2_runner + event_logger + event_logger_msgs + TinyXML2 +) + +pluginlib_export_plugin_description_file(capabilities2_runner plugins.xml) + +install(TARGETS ${PROJECT_NAME} + EXPORT ${PROJECT_NAME} + ARCHIVE DESTINATION lib + LIBRARY DESTINATION lib + RUNTIME DESTINATION bin +) + +install(DIRECTORY include/ + DESTINATION include +) + +ament_export_include_directories(include) +ament_export_libraries(${PROJECT_NAME}) + +ament_package() diff --git a/capabilities2_runner_fabric/include/capabilities2_runner_fabric/completion_runner.hpp b/capabilities2_runner_system/include/capabilities2_runner_system/completion_runner.hpp similarity index 89% rename from capabilities2_runner_fabric/include/capabilities2_runner_fabric/completion_runner.hpp rename to capabilities2_runner_system/include/capabilities2_runner_system/completion_runner.hpp index 2475636..aeb2056 100644 --- a/capabilities2_runner_fabric/include/capabilities2_runner_fabric/completion_runner.hpp +++ b/capabilities2_runner_system/include/capabilities2_runner_system/completion_runner.hpp @@ -8,16 +8,16 @@ namespace capabilities2_runner { /** - * @brief fabric completion runner + * @brief completion runner * * This class is a wrapper around the capabilities2 service runner and is used to * call on the /capabilities_fabric/set_completion service, providing it as a * capability that notifys the completion of the fabric */ -class FabricCompletionRunner : public ServiceRunner +class CompletionRunner : public ServiceRunner { public: - FabricCompletionRunner() : ServiceRunner() + CompletionRunner() : ServiceRunner() { } diff --git a/capabilities2_runner_capabilities/include/capabilities2_runner_capabilities/input_multiplex_all_runner.hpp b/capabilities2_runner_system/include/capabilities2_runner_system/input_multiplex_all_runner.hpp similarity index 77% rename from capabilities2_runner_capabilities/include/capabilities2_runner_capabilities/input_multiplex_all_runner.hpp rename to capabilities2_runner_system/include/capabilities2_runner_system/input_multiplex_all_runner.hpp index 322d8a3..21e6298 100644 --- a/capabilities2_runner_capabilities/include/capabilities2_runner_capabilities/input_multiplex_all_runner.hpp +++ b/capabilities2_runner_system/include/capabilities2_runner_system/input_multiplex_all_runner.hpp @@ -25,36 +25,36 @@ class InputMultiplexAllRunner : public RunnerBase { init_base(node, run_config); - info_("Starting InputMultiplexAllRunner with " + std::to_string(run_config.input_count) + " inputs."); - - expected_inputs_ = run_config.input_count; - - current_inputs_ = 0; - not_triggered_ = true; + info_("started with " + std::to_string(run_config.input_count) + " inputs."); } + /** + * @brief trigger function to handle multiplexing of all inputs based on ALL condition + * + * @param parameters not used in this runner + */ virtual void trigger(const std::string& parameters) override { current_inputs_ += 1; - if (not_triggered_) - not_triggered_ = false; - - if (current_inputs_ == expected_inputs_) + if (current_inputs_ == run_config_.input_count) { - info_("InputMultiplexAllRunner is has fullfilled the All condition with " + std::to_string(current_inputs_) + - " inputs."); + info_("has fullfilled the All condition with " + std::to_string(current_inputs_) + " inputs."); executionThread = std::thread(&InputMultiplexAllRunner::execution, this, thread_id); thread_id += 1; } else { - info_("InputMultiplexAllRunner waiting. Only got " + std::to_string(current_inputs_ + 1) + "/" + - std::to_string(expected_inputs_) + " inputs."); + info_("only got " + std::to_string(current_inputs_) + "/" + std::to_string(run_config_.input_count) + " inputs."); } } + /** + * @brief Trigger process to be executed. + * + * @param id thread id + */ virtual void execution(int id) { // trigger the events related to on_success state @@ -96,12 +96,9 @@ class InputMultiplexAllRunner : public RunnerBase ~InputMultiplexAllRunner(); private: - bool not_triggered_; - - int expected_inputs_; - - int current_inputs_; - + /** + * @brief execution thread to handle the execution of the runner + */ std::thread executionThread; }; diff --git a/capabilities2_runner_capabilities/include/capabilities2_runner_capabilities/input_multiplex_any_runner.hpp b/capabilities2_runner_system/include/capabilities2_runner_system/input_multiplex_any_runner.hpp similarity index 77% rename from capabilities2_runner_capabilities/include/capabilities2_runner_capabilities/input_multiplex_any_runner.hpp rename to capabilities2_runner_system/include/capabilities2_runner_system/input_multiplex_any_runner.hpp index 2adc201..011cbf0 100644 --- a/capabilities2_runner_capabilities/include/capabilities2_runner_capabilities/input_multiplex_any_runner.hpp +++ b/capabilities2_runner_system/include/capabilities2_runner_system/input_multiplex_any_runner.hpp @@ -25,36 +25,36 @@ class InputMultiplexAnyRunner : public RunnerBase { init_base(node, run_config); - info_("Starting InputMultiplexAnyRunner with " + std::to_string(run_config.input_count) + " inputs."); - - expected_inputs_ = run_config.input_count; - - current_inputs_ = 0; - not_triggered_ = true; + info_("started with " + std::to_string(run_config.input_count) + " inputs."); } + /** + * @brief trigger function to handle multiplexing of all inputs based on ANY condition + * + * @param parameters not used in this runner + */ virtual void trigger(const std::string& parameters) override { current_inputs_ += 1; - if (not_triggered_) - not_triggered_ = false; - - if (current_inputs_ >= 0) + if (current_inputs_ > 0) { - info_("InputMultiplexAnyRunner is has fullfilled the ANY condition with " + std::to_string(current_inputs_) + - " inputs."); + info_("has fullfilled the ANY condition with " + std::to_string(current_inputs_) + " inputs."); executionThread = std::thread(&InputMultiplexAnyRunner::execution, this, thread_id); thread_id += 1; } else { - info_("InputMultiplexAnyRunner waiting. Only got " + std::to_string(current_inputs_ + 1) + "/" + - std::to_string(expected_inputs_) + " inputs."); + info_("only got " + std::to_string(current_inputs_) + "/" + std::to_string(run_config_.input_count) + " inputs."); } } + /** + * @brief Trigger process to be executed. + * + * @param id thread id + */ virtual void execution(int id) { // trigger the events related to on_success state @@ -64,6 +64,7 @@ class InputMultiplexAnyRunner : public RunnerBase triggerFunction_(events[execute_id].on_success.interface, update_on_success(events[execute_id].on_success.parameters)); } + // trigger the events related to on_failure state else if (events[execute_id].on_failure.interface != "") { @@ -96,12 +97,9 @@ class InputMultiplexAnyRunner : public RunnerBase ~InputMultiplexAnyRunner(); private: - bool not_triggered_; - - int expected_inputs_; - - int current_inputs_; - + /** + * @brief execution thread to handle the execution of the runner + */ std::thread executionThread; }; diff --git a/capabilities2_runner_system/package.xml b/capabilities2_runner_system/package.xml new file mode 100644 index 0000000..2f3f807 --- /dev/null +++ b/capabilities2_runner_system/package.xml @@ -0,0 +1,35 @@ + + + + capabilities2_runner_system + 0.0.0 + TODO: Package description + + + Kalana Ratnayake + Kalana Ratnayake + + Kalana Ratnayake + + TODO: License declaration + + ament_cmake + + ament_lint_auto + ament_lint_common + + rclcpp + rclcpp_action + pluginlib + std_msgs + capabilities2_msgs + capabilities2_runner + fabric_msgs + event_logger + event_logger_msgs + tinyxml2_vendor + + + ament_cmake + + diff --git a/capabilities2_runner_system/plugins.xml b/capabilities2_runner_system/plugins.xml new file mode 100644 index 0000000..41e8b96 --- /dev/null +++ b/capabilities2_runner_system/plugins.xml @@ -0,0 +1,19 @@ + + + + A plugin that Acts as a multiplexer for multiple input streams, allowing them to be processed in a single runner. + It can handle multiple input streams and route them to the appropriate output. + + + + + A plugin that Acts as a multiplexer for multiple input streams, allowing them to be processed in a single runner. + It can handle multiple input streams and route them to the appropriate output. + + + + + A plugin that notifies about the completion of the fabric to the action server + + + diff --git a/capabilities2_runner_system/src/capabilities2_runner.cpp b/capabilities2_runner_system/src/capabilities2_runner.cpp new file mode 100644 index 0000000..9d828da --- /dev/null +++ b/capabilities2_runner_system/src/capabilities2_runner.cpp @@ -0,0 +1,10 @@ +#include +#include +#include +#include +#include + +// register runner plugins +PLUGINLIB_EXPORT_CLASS(capabilities2_runner::InputMultiplexAllRunner, capabilities2_runner::RunnerBase); +PLUGINLIB_EXPORT_CLASS(capabilities2_runner::InputMultiplexAnyRunner, capabilities2_runner::RunnerBase); +PLUGINLIB_EXPORT_CLASS(capabilities2_runner::CompletionRunner, capabilities2_runner::RunnerBase) \ No newline at end of file diff --git a/capabilities2_server/include/capabilities2_server/capabilities_api.hpp b/capabilities2_server/include/capabilities2_server/capabilities_api.hpp index 6e9bf96..9901150 100644 --- a/capabilities2_server/include/capabilities2_server/capabilities_api.hpp +++ b/capabilities2_server/include/capabilities2_server/capabilities_api.hpp @@ -69,10 +69,12 @@ class CapabilitiesAPI * @param node ros node pointer of the ros server * @param capability capability name to be started * @param provider provider of the capability + * @param input_count number of inputs for the capability * * @return `true` if capability started successfully. else returns `false` */ - bool start_capability(rclcpp::Node::SharedPtr node, const std::string& capability, const std::string& provider) + bool start_capability(rclcpp::Node::SharedPtr node, const std::string& capability, const std::string& provider, + int input_count= 0) { // return value bool value = true; @@ -96,14 +98,14 @@ class CapabilitiesAPI // get the provider specification for the capability models::run_config_model_t run_config = cap_db_->get_run_config(provider); - // create a new runner - // this call implicitly starts the runner + // create a new runner, this call implicitly starts the runner // create a runner id which is the cap name to uniquely identify the runner // this means only one runner per capability name + // // TODO: consider the logic for multiple runners per capability try { - runner_cache_.add_runner(node, capability, run_config); + runner_cache_.add_runner(node, capability, run_config, input_count); event_->info("started capability: " + capability + " with provider: " + provider); @@ -220,18 +222,19 @@ class CapabilitiesAPI * @param node ros node pointer of the ros server * @param capability capability name to be started * @param provider provider of the capability + * @param input_count number of inputs for the capability * @param bond_id bond_id for the capability * * @return `true` if capability started successfully. else returns `false` */ bool use_capability(rclcpp::Node::SharedPtr node, const std::string& capability, const std::string& provider, - const std::string& bond_id) + int input_count, const std::string& bond_id) { // add bond to cache for capability bond_cache_.add_bond(capability, bond_id); // start the capability with the provider - return start_capability(node, capability, provider); + return start_capability(node, capability, provider, input_count); } /** diff --git a/capabilities2_server/include/capabilities2_server/capabilities_server.hpp b/capabilities2_server/include/capabilities2_server/capabilities_server.hpp index 32528b6..04d6397 100644 --- a/capabilities2_server/include/capabilities2_server/capabilities_server.hpp +++ b/capabilities2_server/include/capabilities2_server/capabilities_server.hpp @@ -308,8 +308,14 @@ class CapabilitiesServer : public rclcpp::Node, public CapabilitiesAPI return; } + if (req->input_count > 0) + { + event_->error("use_capability: input count should be non zero"); + return; + } + // use capability with this bond - use_capability(shared_from_this(), req->capability, req->preferred_provider, req->bond_id); + use_capability(shared_from_this(), req->capability, req->preferred_provider, req->input_count, req->bond_id); // response is empty } diff --git a/capabilities2_server/include/capabilities2_server/runner_cache.hpp b/capabilities2_server/include/capabilities2_server/runner_cache.hpp index 1b20f79..4a21f21 100644 --- a/capabilities2_server/include/capabilities2_server/runner_cache.hpp +++ b/capabilities2_server/include/capabilities2_server/runner_cache.hpp @@ -50,10 +50,10 @@ class RunnerCache * @param run_config run_config of the runner to be loaded */ void add_runner(rclcpp::Node::SharedPtr node, const std::string& capability, - const models::run_config_model_t& run_config) + const models::run_config_model_t& run_config, int input_count = 0) { - // if the runner exists then throw an error - if (running(capability)) + // if the runner exists then throw an error preserving uniqueness + if (!is_system_capability(capability) && running(capability)) { // already running throw capabilities2_runner::runner_exception("capability is running already: " + capability); @@ -66,22 +66,28 @@ class RunnerCache throw capabilities2_runner::runner_exception("run config is not valid: " + YAML::Dump(run_config.to_yaml())); } - // create the runner - // add the runner to map - // if the spec runner contains a path to a launch file then use the launch file runner + // create the runner, add the runner to map, and if the spec runner contains a path to a launch file then use the + // launch file runner if (run_config.runner.find(".launch") != std::string::npos || run_config.runner.find("/") != std::string::npos || run_config.runner.find(".py") != std::string::npos) { runner_cache_[capability] = runner_loader_.createSharedInstance("capabilities2_runner::LaunchRunner"); } + else if (is_system_capability(capability)) + { + system_runner_cache_[capability].push_back(runner_loader_.createSharedInstance(run_config.runner)); + } else { - // use different runner types based on cap and provider specs runner_cache_[capability] = runner_loader_.createSharedInstance(run_config.runner); } + // add input count to the runner options + capabilities2_runner::runner_opts opts = run_config.to_runner_opts(); + opts.input_count = input_count; + // start the runner - runner_cache_[capability]->start(node, run_config.to_runner_opts()); + runner_cache_[capability]->start(node, opts); } /** @@ -130,6 +136,24 @@ class RunnerCache */ void remove_runner(const std::string& capability) { + if (is_system_capability(capability)) + { + // remove from system runner cache + auto it = system_runner_cache_.find(capability); + + // loop through runners and stop them + if (it != system_runner_cache_.end()) + { + for (auto& runner : it->second) + if (runner->get_completion_status()) + runner->stop(); + + // system_runner_cache_.erase(it); + } + } + + // ---- non-system: preserve your current unique semantics ---- + // find the runner in the cache if (!running(capability)) { @@ -225,11 +249,27 @@ class RunnerCache return runner_cache_.find(capability) != runner_cache_.end(); } + /** + * @brief Check if a capability is a system capability + * + * @param cap capability name + * @return true if it is a system capability + * @return false otherwise + */ + bool is_system_capability(const std::string& capability) + { + // (future-proof: new system runners still match) + return capability.rfind("system_capabilities/", 0) == 0; + } + private: // map capability to running model // capability / provider specs -> runner std::map> runner_cache_; + // system runner cache that allows duplicates + std::map>> system_runner_cache_; + // runner plugin loader pluginlib::ClassLoader runner_loader_; diff --git a/system_capabilities/CMakeLists.txt b/system_capabilities/CMakeLists.txt new file mode 100644 index 0000000..1d10d3b --- /dev/null +++ b/system_capabilities/CMakeLists.txt @@ -0,0 +1,16 @@ +cmake_minimum_required(VERSION 3.8) +project(system_capabilities) + +find_package(ament_cmake REQUIRED) + +# install interface files +install(DIRECTORY interfaces + DESTINATION share/${PROJECT_NAME} +) + +# install semantic interface files +install(DIRECTORY providers + DESTINATION share/${PROJECT_NAME} +) + +ament_package() diff --git a/system_capabilities/LICENSE b/system_capabilities/LICENSE new file mode 100644 index 0000000..30e8e2e --- /dev/null +++ b/system_capabilities/LICENSE @@ -0,0 +1,17 @@ +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/system_capabilities/interfaces/CompletionRunner.yaml b/system_capabilities/interfaces/CompletionRunner.yaml new file mode 100644 index 0000000..903af66 --- /dev/null +++ b/system_capabilities/interfaces/CompletionRunner.yaml @@ -0,0 +1,13 @@ +%YAML 1.1 +--- +name: CompletionRunner +spec_version: 1.1 +spec_type: interface +description: "This capability notifies the completion of the capabilities fabric to the completion_server on the capabilities2 fabric + package. This is included on every fabric as the last capability to be triggered during connection idenification for the + fabric. A decision making authority such as an LLM does not need to include this in plans generated by it." +interface: + actions: + "/capabilities_fabric/set_completion": + type: "capabilities2_msgs::srv::CompleteFabric" + description: "Fabric completion notifier interface of the Capabilities fabric" \ No newline at end of file diff --git a/system_capabilities/interfaces/InputMultiplexAllRunner.yaml b/system_capabilities/interfaces/InputMultiplexAllRunner.yaml new file mode 100644 index 0000000..a587da6 --- /dev/null +++ b/system_capabilities/interfaces/InputMultiplexAllRunner.yaml @@ -0,0 +1,8 @@ +%YAML 1.1 +--- +name: InputMultiplexAllRunner +spec_version: 1.1 +spec_type: interface +description: "This capability combines the results of all input by multiplexing events into a single interface. It allows the robot to wait or + multiple parallel processes at once until completion. This is inserted by the system itself and a decision making authority such + as an LLM does not need to include this in plans generated by it." \ No newline at end of file diff --git a/system_capabilities/interfaces/InputMultiplexAnyRunner.yaml b/system_capabilities/interfaces/InputMultiplexAnyRunner.yaml new file mode 100644 index 0000000..3b06509 --- /dev/null +++ b/system_capabilities/interfaces/InputMultiplexAnyRunner.yaml @@ -0,0 +1,8 @@ +%YAML 1.1 +--- +name: InputMultiplexAnyRunner +spec_version: 1.1 +spec_type: interface +description: "This capability combines the results of any (at least one) input by multiplexing events into a single interface. It allows the + robot to wait or multiple parallel processes at once until completion. This is inserted by the system itself and a decision + making authority such as an LLM does not need to include this in plans generated by it." \ No newline at end of file diff --git a/system_capabilities/package.xml b/system_capabilities/package.xml new file mode 100644 index 0000000..6bbf60c --- /dev/null +++ b/system_capabilities/package.xml @@ -0,0 +1,48 @@ + + + + system_capabilities + 0.0.0 + TODO: Package description + kalana + MIT + + ament_cmake + + ament_lint_auto + ament_lint_common + + + ament_cmake + + + + interfaces/CompletionRunner.yaml + + + + providers/CompletionRunner.yaml + + + + + interfaces/InputMultiplexAllRunner.yaml + + + + providers/InputMultiplexAllRunner.yaml + + + + + + interfaces/InputMultiplexAnyRunner.yaml + + + + providers/InputMultiplexAnyRunner.yaml + + + + + diff --git a/system_capabilities/providers/CompletionRunner.yaml b/system_capabilities/providers/CompletionRunner.yaml new file mode 100644 index 0000000..ca84100 --- /dev/null +++ b/system_capabilities/providers/CompletionRunner.yaml @@ -0,0 +1,8 @@ +%YAML 1.1 +--- +name: CompletionRunner +spec_type: provider +spec_version: 1.1 +description: "The capability provider for the /capabilities_fabric/set_completion interface" +implements: system_capabilities/CompletionRunner +runner: capabilities2_runner::CompletionRunner diff --git a/system_capabilities/providers/InputMultiplexAllRunner.yaml b/system_capabilities/providers/InputMultiplexAllRunner.yaml new file mode 100644 index 0000000..b3edbfd --- /dev/null +++ b/system_capabilities/providers/InputMultiplexAllRunner.yaml @@ -0,0 +1,8 @@ +%YAML 1.1 +--- +name: InputMultiplexAllRunner +spec_type: provider +spec_version: 1.1 +description: "The capability provider for the system_capabilities/InputMultiplexAllRunner interface" +implements: system_capabilities/InputMultiplexAllRunner +runner: capabilities2_runner::InputMultiplexAllRunner \ No newline at end of file diff --git a/system_capabilities/providers/InputMultiplexAnyRunner.yaml b/system_capabilities/providers/InputMultiplexAnyRunner.yaml new file mode 100644 index 0000000..789ee28 --- /dev/null +++ b/system_capabilities/providers/InputMultiplexAnyRunner.yaml @@ -0,0 +1,8 @@ +%YAML 1.1 +--- +name: InputMultiplexAnyRunner +spec_type: provider +spec_version: 1.1 +description: "The capability provider for the system_capabilities/InputMultiplexAnyRunner interface" +implements: system_capabilities/InputMultiplexAnyRunner +runner: capabilities2_runner::InputMultiplexAnyRunner \ No newline at end of file From dd7e5d79205cdc14b04044044e4e6485fc8e6a4f Mon Sep 17 00:00:00 2001 From: Kalana Ratnayake Date: Fri, 15 Aug 2025 17:04:59 +1000 Subject: [PATCH 3/5] minor modifiations after testing --- capabilities2_msgs/srv/UseCapability.srv | 1 - .../capabilities2_runner/runner_base.hpp | 3 +- .../completion_runner.hpp | 2 +- .../input_multiplex_all_runner.hpp | 92 ++++--------- .../input_multiplex_any_runner.hpp | 91 +++---------- .../multiplex_base_runner.hpp | 122 ++++++++++++++++++ .../capabilities2_server/capabilities_api.hpp | 12 +- .../capabilities_server.hpp | 8 +- .../capabilities2_server/runner_cache.hpp | 60 +-------- .../capabilities2_utils/connection.hpp | 1 - .../interfaces/CompletionRunner.yaml | 2 +- .../interfaces/InputMultiplexAllRunner.yaml | 7 +- .../interfaces/InputMultiplexAnyRunner.yaml | 7 +- .../providers/CompletionRunner.yaml | 2 +- 14 files changed, 195 insertions(+), 215 deletions(-) create mode 100644 capabilities2_runner_system/include/capabilities2_runner_system/multiplex_base_runner.hpp diff --git a/capabilities2_msgs/srv/UseCapability.srv b/capabilities2_msgs/srv/UseCapability.srv index 148ab0d..fd6b98e 100644 --- a/capabilities2_msgs/srv/UseCapability.srv +++ b/capabilities2_msgs/srv/UseCapability.srv @@ -1,5 +1,4 @@ string capability string preferred_provider -int32 input_count string bond_id --- diff --git a/capabilities2_runner/include/capabilities2_runner/runner_base.hpp b/capabilities2_runner/include/capabilities2_runner/runner_base.hpp index 6eb299c..a9115bf 100644 --- a/capabilities2_runner/include/capabilities2_runner/runner_base.hpp +++ b/capabilities2_runner/include/capabilities2_runner/runner_base.hpp @@ -141,10 +141,11 @@ class RunnerBase * @brief attach events to the runner * * @param event_option event_options related for the action + * @param triggerFunction external function that triggers capability runners * * @return number of attached events */ - int attach_events(event_logger::event_opts& event_option, + virtual int attach_events(event_logger::event_opts& event_option, std::function triggerFunction) { info_("accepted event options with ID : " + std::to_string(insert_id)); diff --git a/capabilities2_runner_system/include/capabilities2_runner_system/completion_runner.hpp b/capabilities2_runner_system/include/capabilities2_runner_system/completion_runner.hpp index aeb2056..8e5dcdc 100644 --- a/capabilities2_runner_system/include/capabilities2_runner_system/completion_runner.hpp +++ b/capabilities2_runner_system/include/capabilities2_runner_system/completion_runner.hpp @@ -11,7 +11,7 @@ namespace capabilities2_runner * @brief completion runner * * This class is a wrapper around the capabilities2 service runner and is used to - * call on the /capabilities_fabric/set_completion service, providing it as a + * call on the /fabric/set_completion service, providing it as a * capability that notifys the completion of the fabric */ class CompletionRunner : public ServiceRunner diff --git a/capabilities2_runner_system/include/capabilities2_runner_system/input_multiplex_all_runner.hpp b/capabilities2_runner_system/include/capabilities2_runner_system/input_multiplex_all_runner.hpp index 21e6298..6def71a 100644 --- a/capabilities2_runner_system/include/capabilities2_runner_system/input_multiplex_all_runner.hpp +++ b/capabilities2_runner_system/include/capabilities2_runner_system/input_multiplex_all_runner.hpp @@ -1,33 +1,19 @@ #pragma once -#include -#include +#include namespace capabilities2_runner { -class InputMultiplexAllRunner : public RunnerBase +class InputMultiplexAllRunner : public MultiplexBaseRunner { public: /** * @brief Constructor which needs to be empty due to plugin semantics */ - InputMultiplexAllRunner() : RunnerBase() + InputMultiplexAllRunner() : MultiplexBaseRunner() { } - /** - * @brief Starter function for starting the action runner - * - * @param node shared pointer to the capabilities node. Allows to use ros node related functionalities - * @param run_config runner configuration loaded from the yaml file - */ - virtual void start(rclcpp::Node::SharedPtr node, const runner_opts& run_config) override - { - init_base(node, run_config); - - info_("started with " + std::to_string(run_config.input_count) + " inputs."); - } - /** * @brief trigger function to handle multiplexing of all inputs based on ALL condition * @@ -35,71 +21,37 @@ class InputMultiplexAllRunner : public RunnerBase */ virtual void trigger(const std::string& parameters) override { - current_inputs_ += 1; + tinyxml2::XMLElement* parameters_ = convert_to_xml(parameters); + + int uid = 0; + int input_count = 0; - if (current_inputs_ == run_config_.input_count) + parameters_->QueryIntAttribute("input_count", &input_count); + parameters_->QueryIntAttribute("uid", &uid); + + if (input_count_tracker.find(uid) == input_count_tracker.end()) { - info_("has fullfilled the All condition with " + std::to_string(current_inputs_) + " inputs."); + input_count_tracker[uid] = 1; + expected_input_count[uid] = input_count; - executionThread = std::thread(&InputMultiplexAllRunner::execution, this, thread_id); - thread_id += 1; + info_("has started the All condition with " + std::to_string(input_count_tracker[uid]) + " inputs."); } else { - info_("only got " + std::to_string(current_inputs_) + "/" + std::to_string(run_config_.input_count) + " inputs."); - } - } + input_count_tracker[uid] += 1; - /** - * @brief Trigger process to be executed. - * - * @param id thread id - */ - virtual void execution(int id) - { - // trigger the events related to on_success state - if (events[execute_id].on_success.interface != "") - { - event_(EventType::SUCCEEDED, id, events[execute_id].on_success.interface, events[execute_id].on_success.provider); - triggerFunction_(events[execute_id].on_success.interface, - update_on_success(events[execute_id].on_success.parameters)); + info_("has received " + std::to_string(input_count_tracker[uid]) + "/" + + std::to_string(expected_input_count[uid]) + " inputs for ALL condition."); } - // trigger the events related to on_failure state - else if (events[execute_id].on_failure.interface != "") - { - event_(EventType::FAILED, id, events[execute_id].on_failure.interface, events[execute_id].on_failure.provider); - triggerFunction_(events[execute_id].on_failure.interface, - update_on_failure(events[execute_id].on_failure.parameters)); - } - } - - /** - * @brief stop function to cease functionality and shutdown - * - */ - virtual void stop() override - { - // if the node pointer is empty then throw an error - // this means that the runner was not started and is being used out of order - if (!node_) - throw runner_exception("cannot stop runner that was not started"); + if (input_count_tracker[uid] == expected_input_count[uid]) + { + info_("has fullfilled the All condition with " + std::to_string(input_count_tracker[uid]) + " inputs."); - info_("stopping runner"); + executionThreadPool[uid] = std::thread(&InputMultiplexAllRunner::execution, this, uid); + } } - /** - * @brief Destructor - * - * Cleans up the thread if it is still running - */ - ~InputMultiplexAllRunner(); - -private: - /** - * @brief execution thread to handle the execution of the runner - */ - std::thread executionThread; }; } // namespace capabilities2_runner diff --git a/capabilities2_runner_system/include/capabilities2_runner_system/input_multiplex_any_runner.hpp b/capabilities2_runner_system/include/capabilities2_runner_system/input_multiplex_any_runner.hpp index 011cbf0..5960a77 100644 --- a/capabilities2_runner_system/include/capabilities2_runner_system/input_multiplex_any_runner.hpp +++ b/capabilities2_runner_system/include/capabilities2_runner_system/input_multiplex_any_runner.hpp @@ -1,33 +1,19 @@ #pragma once -#include -#include +#include namespace capabilities2_runner { -class InputMultiplexAnyRunner : public RunnerBase +class InputMultiplexAnyRunner : public MultiplexBaseRunner { public: /** * @brief Constructor which needs to be empty due to plugin semantics */ - InputMultiplexAnyRunner() : RunnerBase() + InputMultiplexAnyRunner() : MultiplexBaseRunner() { } - /** - * @brief Starter function for starting the action runner - * - * @param node shared pointer to the capabilities node. Allows to use ros node related functionalities - * @param run_config runner configuration loaded from the yaml file - */ - virtual void start(rclcpp::Node::SharedPtr node, const runner_opts& run_config) override - { - init_base(node, run_config); - - info_("started with " + std::to_string(run_config.input_count) + " inputs."); - } - /** * @brief trigger function to handle multiplexing of all inputs based on ANY condition * @@ -35,72 +21,37 @@ class InputMultiplexAnyRunner : public RunnerBase */ virtual void trigger(const std::string& parameters) override { - current_inputs_ += 1; + tinyxml2::XMLElement* parameters_ = convert_to_xml(parameters); + + int uid = 0; + int input_count = 0; - if (current_inputs_ > 0) + parameters_->QueryIntAttribute("input_count", &input_count); + parameters_->QueryIntAttribute("uid", &uid); + + if (input_count_tracker.find(uid) == input_count_tracker.end()) { - info_("has fullfilled the ANY condition with " + std::to_string(current_inputs_) + " inputs."); + input_count_tracker[uid] = 1; + expected_input_count[uid] = input_count; - executionThread = std::thread(&InputMultiplexAnyRunner::execution, this, thread_id); - thread_id += 1; + info_("has started the ANY condition with " + std::to_string(input_count_tracker[uid]) + " inputs."); } else { - info_("only got " + std::to_string(current_inputs_) + "/" + std::to_string(run_config_.input_count) + " inputs."); - } - } + input_count_tracker[uid] += 1; - /** - * @brief Trigger process to be executed. - * - * @param id thread id - */ - virtual void execution(int id) - { - // trigger the events related to on_success state - if (events[execute_id].on_success.interface != "") - { - event_(EventType::SUCCEEDED, id, events[execute_id].on_success.interface, events[execute_id].on_success.provider); - triggerFunction_(events[execute_id].on_success.interface, - update_on_success(events[execute_id].on_success.parameters)); + info_("has received " + std::to_string(input_count_tracker[uid]) + "/" + + std::to_string(expected_input_count[uid]) + " inputs for ANY condition."); } - // trigger the events related to on_failure state - else if (events[execute_id].on_failure.interface != "") + if (input_count_tracker[uid] > 0) { - event_(EventType::FAILED, id, events[execute_id].on_failure.interface, events[execute_id].on_failure.provider); - triggerFunction_(events[execute_id].on_failure.interface, - update_on_failure(events[execute_id].on_failure.parameters)); - } - } - - /** - * @brief stop function to cease functionality and shutdown - * - */ - virtual void stop() override - { - // if the node pointer is empty then throw an error - // this means that the runner was not started and is being used out of order - - if (!node_) - throw runner_exception("cannot stop runner that was not started"); + info_("has fullfilled the ANY condition with " + std::to_string(input_count_tracker[uid]) + " inputs."); - info_("stopping runner"); + executionThreadPool[uid] = std::thread(&InputMultiplexAnyRunner::execution, this, uid); + } } - /** - * @brief Destructor - * - * Cleans up the thread if it is still running - */ - ~InputMultiplexAnyRunner(); - -private: - /** - * @brief execution thread to handle the execution of the runner - */ - std::thread executionThread; }; } // namespace capabilities2_runner diff --git a/capabilities2_runner_system/include/capabilities2_runner_system/multiplex_base_runner.hpp b/capabilities2_runner_system/include/capabilities2_runner_system/multiplex_base_runner.hpp new file mode 100644 index 0000000..c02739d --- /dev/null +++ b/capabilities2_runner_system/include/capabilities2_runner_system/multiplex_base_runner.hpp @@ -0,0 +1,122 @@ +#pragma once + +#include +#include +#include +#include + +namespace capabilities2_runner +{ +class MultiplexBaseRunner : public RunnerBase +{ +public: + /** + * @brief Constructor which needs to be empty due to plugin semantics + */ + MultiplexBaseRunner() : RunnerBase() + { + } + + /** + * @brief Starter function for starting the action runner + * + * @param node shared pointer to the capabilities node. Allows to use ros node related functionalities + * @param run_config runner configuration loaded from the yaml file + */ + virtual void start(rclcpp::Node::SharedPtr node, const runner_opts& run_config) override + { + init_base(node, run_config); + } + + /** + * @brief Trigger process to be executed. + * + * @param uid unique identifier for the execution + */ + virtual void execution(int uid) + { + info_("execution started for uid: " + std::to_string(uid)); + + // trigger the events related to on_success state + if (events[uid].on_success.interface != "") + { + event_(EventType::SUCCEEDED, uid, events[uid].on_success.interface, events[uid].on_success.provider); + triggerFunction_(events[uid].on_success.interface, update_on_success(events[uid].on_success.parameters)); + } + // trigger the events related to on_failure state + else if (events[uid].on_failure.interface != "") + { + event_(EventType::FAILED, uid, events[uid].on_failure.interface, events[uid].on_failure.provider); + triggerFunction_(events[uid].on_failure.interface, update_on_failure(events[uid].on_failure.parameters)); + } + } + + /** + * @brief attach events to the runner + * + * @param event_option event_options related for the action + * @param triggerFunction external function that triggers capability runners + * + * @return number of attached events + */ + virtual int attach_events(event_logger::event_opts& event_option, + std::function triggerFunction) override + { + info_("accepted event options with ID : " + std::to_string(insert_id)); + + triggerFunction_ = triggerFunction; + + tinyxml2::XMLElement* on_success_params = convert_to_xml(event_option.on_success.parameters); + + int uid = NULL; + + // extract the uid from the event options from whatever runner is present by looping + if (event_option.on_success.interface != "") + { + tinyxml2::XMLElement* on_success_params = convert_to_xml(event_option.on_success.parameters); + on_success_params->QueryIntAttribute("uid", &uid); + } + else if (event_option.on_failure.interface != "") + { + tinyxml2::XMLElement* on_failure_params = convert_to_xml(event_option.on_failure.parameters); + on_failure_params->QueryIntAttribute("uid", &uid); + } + else if (event_option.on_started.interface != "") + { + tinyxml2::XMLElement* on_started_params = convert_to_xml(event_option.on_started.parameters); + on_started_params->QueryIntAttribute("uid", &uid); + } + else if (event_option.on_stopped.interface != "") + { + tinyxml2::XMLElement* on_stopped_params = convert_to_xml(event_option.on_stopped.parameters); + on_stopped_params->QueryIntAttribute("uid", &uid); + } + + events[uid] = event_option; + + return uid; + } + + /** + * @brief stop function to cease functionality and shutdown + * + */ + virtual void stop() override + { + // if the node pointer is empty then throw an error + // this means that the runner was not started and is being used out of order + + if (!node_) + throw runner_exception("cannot stop runner that was not started"); + + info_("stopping runner"); + } + +protected: + // input count tracker + std::map input_count_tracker; + + // expected input count + std::map expected_input_count; +}; +} // namespace capabilities2_runner \ No newline at end of file diff --git a/capabilities2_server/include/capabilities2_server/capabilities_api.hpp b/capabilities2_server/include/capabilities2_server/capabilities_api.hpp index 9901150..97b8b24 100644 --- a/capabilities2_server/include/capabilities2_server/capabilities_api.hpp +++ b/capabilities2_server/include/capabilities2_server/capabilities_api.hpp @@ -69,12 +69,10 @@ class CapabilitiesAPI * @param node ros node pointer of the ros server * @param capability capability name to be started * @param provider provider of the capability - * @param input_count number of inputs for the capability * * @return `true` if capability started successfully. else returns `false` */ - bool start_capability(rclcpp::Node::SharedPtr node, const std::string& capability, const std::string& provider, - int input_count= 0) + bool start_capability(rclcpp::Node::SharedPtr node, const std::string& capability, const std::string& provider) { // return value bool value = true; @@ -105,7 +103,7 @@ class CapabilitiesAPI // TODO: consider the logic for multiple runners per capability try { - runner_cache_.add_runner(node, capability, run_config, input_count); + runner_cache_.add_runner(node, capability, run_config); event_->info("started capability: " + capability + " with provider: " + provider); @@ -114,6 +112,7 @@ class CapabilitiesAPI catch (const capabilities2_runner::runner_exception& e) { event_->error("could not start runner: " + std::string(e.what())); + return false; } } @@ -222,19 +221,18 @@ class CapabilitiesAPI * @param node ros node pointer of the ros server * @param capability capability name to be started * @param provider provider of the capability - * @param input_count number of inputs for the capability * @param bond_id bond_id for the capability * * @return `true` if capability started successfully. else returns `false` */ bool use_capability(rclcpp::Node::SharedPtr node, const std::string& capability, const std::string& provider, - int input_count, const std::string& bond_id) + const std::string& bond_id) { // add bond to cache for capability bond_cache_.add_bond(capability, bond_id); // start the capability with the provider - return start_capability(node, capability, provider, input_count); + return start_capability(node, capability, provider); } /** diff --git a/capabilities2_server/include/capabilities2_server/capabilities_server.hpp b/capabilities2_server/include/capabilities2_server/capabilities_server.hpp index 04d6397..32528b6 100644 --- a/capabilities2_server/include/capabilities2_server/capabilities_server.hpp +++ b/capabilities2_server/include/capabilities2_server/capabilities_server.hpp @@ -308,14 +308,8 @@ class CapabilitiesServer : public rclcpp::Node, public CapabilitiesAPI return; } - if (req->input_count > 0) - { - event_->error("use_capability: input count should be non zero"); - return; - } - // use capability with this bond - use_capability(shared_from_this(), req->capability, req->preferred_provider, req->input_count, req->bond_id); + use_capability(shared_from_this(), req->capability, req->preferred_provider, req->bond_id); // response is empty } diff --git a/capabilities2_server/include/capabilities2_server/runner_cache.hpp b/capabilities2_server/include/capabilities2_server/runner_cache.hpp index 4a21f21..4c1aac1 100644 --- a/capabilities2_server/include/capabilities2_server/runner_cache.hpp +++ b/capabilities2_server/include/capabilities2_server/runner_cache.hpp @@ -50,14 +50,12 @@ class RunnerCache * @param run_config run_config of the runner to be loaded */ void add_runner(rclcpp::Node::SharedPtr node, const std::string& capability, - const models::run_config_model_t& run_config, int input_count = 0) + const models::run_config_model_t& run_config) { // if the runner exists then throw an error preserving uniqueness - if (!is_system_capability(capability) && running(capability)) + if (running(capability)) { - // already running throw capabilities2_runner::runner_exception("capability is running already: " + capability); - // return; } // check if run config is valid @@ -73,21 +71,13 @@ class RunnerCache { runner_cache_[capability] = runner_loader_.createSharedInstance("capabilities2_runner::LaunchRunner"); } - else if (is_system_capability(capability)) - { - system_runner_cache_[capability].push_back(runner_loader_.createSharedInstance(run_config.runner)); - } else { runner_cache_[capability] = runner_loader_.createSharedInstance(run_config.runner); } - // add input count to the runner options - capabilities2_runner::runner_opts opts = run_config.to_runner_opts(); - opts.input_count = input_count; - // start the runner - runner_cache_[capability]->start(node, opts); + runner_cache_[capability]->start(node, run_config.to_runner_opts()); } /** @@ -124,9 +114,9 @@ class RunnerCache */ void set_runner_triggers(const std::string& capability, event_logger::event_opts& event_options) { - int event_count = runner_cache_[capability]->attach_events( - event_options, std::bind(&capabilities2_server::RunnerCache::trigger_runner, this, std::placeholders::_1, - std::placeholders::_2)); + runner_cache_[capability]->attach_events(event_options, + std::bind(&capabilities2_server::RunnerCache::trigger_runner, this, + std::placeholders::_1, std::placeholders::_2)); } /** @@ -136,30 +126,10 @@ class RunnerCache */ void remove_runner(const std::string& capability) { - if (is_system_capability(capability)) - { - // remove from system runner cache - auto it = system_runner_cache_.find(capability); - - // loop through runners and stop them - if (it != system_runner_cache_.end()) - { - for (auto& runner : it->second) - if (runner->get_completion_status()) - runner->stop(); - - // system_runner_cache_.erase(it); - } - } - - // ---- non-system: preserve your current unique semantics ---- - - // find the runner in the cache + // find the runner in the cache and if not found then throw an error if (!running(capability)) { - // not found so nothing to do throw capabilities2_runner::runner_exception("capability runner not found: " + capability); - // return; } // safely stop the runner @@ -249,27 +219,11 @@ class RunnerCache return runner_cache_.find(capability) != runner_cache_.end(); } - /** - * @brief Check if a capability is a system capability - * - * @param cap capability name - * @return true if it is a system capability - * @return false otherwise - */ - bool is_system_capability(const std::string& capability) - { - // (future-proof: new system runners still match) - return capability.rfind("system_capabilities/", 0) == 0; - } - private: // map capability to running model // capability / provider specs -> runner std::map> runner_cache_; - // system runner cache that allows duplicates - std::map>> system_runner_cache_; - // runner plugin loader pluginlib::ClassLoader runner_loader_; diff --git a/capabilities2_utils/include/capabilities2_utils/connection.hpp b/capabilities2_utils/include/capabilities2_utils/connection.hpp index d6b68dc..c88ede3 100644 --- a/capabilities2_utils/include/capabilities2_utils/connection.hpp +++ b/capabilities2_utils/include/capabilities2_utils/connection.hpp @@ -16,7 +16,6 @@ namespace capabilities2 { std::string runner = ""; std::string provider = ""; - int input_count = 0; tinyxml2::XMLElement* parameters = nullptr; }; diff --git a/system_capabilities/interfaces/CompletionRunner.yaml b/system_capabilities/interfaces/CompletionRunner.yaml index 903af66..d34d09f 100644 --- a/system_capabilities/interfaces/CompletionRunner.yaml +++ b/system_capabilities/interfaces/CompletionRunner.yaml @@ -8,6 +8,6 @@ description: "This capability notifies the completion of the capabilities fabric fabric. A decision making authority such as an LLM does not need to include this in plans generated by it." interface: actions: - "/capabilities_fabric/set_completion": + "/fabric/set_completion": type: "capabilities2_msgs::srv::CompleteFabric" description: "Fabric completion notifier interface of the Capabilities fabric" \ No newline at end of file diff --git a/system_capabilities/interfaces/InputMultiplexAllRunner.yaml b/system_capabilities/interfaces/InputMultiplexAllRunner.yaml index a587da6..0ab62cc 100644 --- a/system_capabilities/interfaces/InputMultiplexAllRunner.yaml +++ b/system_capabilities/interfaces/InputMultiplexAllRunner.yaml @@ -5,4 +5,9 @@ spec_version: 1.1 spec_type: interface description: "This capability combines the results of all input by multiplexing events into a single interface. It allows the robot to wait or multiple parallel processes at once until completion. This is inserted by the system itself and a decision making authority such - as an LLM does not need to include this in plans generated by it." \ No newline at end of file + as an LLM does not need to include this in plans generated by it." +interface: + actions: + "empty": + type: std_srvs/action/Empty + description: empty. not used \ No newline at end of file diff --git a/system_capabilities/interfaces/InputMultiplexAnyRunner.yaml b/system_capabilities/interfaces/InputMultiplexAnyRunner.yaml index 3b06509..ff4313b 100644 --- a/system_capabilities/interfaces/InputMultiplexAnyRunner.yaml +++ b/system_capabilities/interfaces/InputMultiplexAnyRunner.yaml @@ -5,4 +5,9 @@ spec_version: 1.1 spec_type: interface description: "This capability combines the results of any (at least one) input by multiplexing events into a single interface. It allows the robot to wait or multiple parallel processes at once until completion. This is inserted by the system itself and a decision - making authority such as an LLM does not need to include this in plans generated by it." \ No newline at end of file + making authority such as an LLM does not need to include this in plans generated by it." +interface: + actions: + "empty": + type: std_srvs/action/Empty + description: empty. not used \ No newline at end of file diff --git a/system_capabilities/providers/CompletionRunner.yaml b/system_capabilities/providers/CompletionRunner.yaml index ca84100..3fb38e6 100644 --- a/system_capabilities/providers/CompletionRunner.yaml +++ b/system_capabilities/providers/CompletionRunner.yaml @@ -3,6 +3,6 @@ name: CompletionRunner spec_type: provider spec_version: 1.1 -description: "The capability provider for the /capabilities_fabric/set_completion interface" +description: "The capability provider for the /fabric/set_completion interface" implements: system_capabilities/CompletionRunner runner: capabilities2_runner::CompletionRunner From 4657c750ee412153849e6793ecb5a80da90b0687 Mon Sep 17 00:00:00 2001 From: Kalana Ratnayake Date: Sun, 17 Aug 2025 20:32:43 +1000 Subject: [PATCH 4/5] updated event system to be compatible with multiplexing runners used in recovery and parallel routiens --- .../srv/ConfigureCapability.srv | 1 + capabilities2_runner/CMakeLists.txt | 2 + .../capabilities2_runner/action_runner.hpp | 37 ++++----- .../capabilities2_runner/runner_base.hpp | 53 +++++------- .../capabilities2_runner/service_runner.hpp | 35 ++++---- .../capabilities2_runner/topic_runner.hpp | 32 +++----- capabilities2_runner/package.xml | 1 + capabilities2_runner_audio/CMakeLists.txt | 2 + capabilities2_runner_audio/package.xml | 1 + capabilities2_runner_bt/CMakeLists.txt | 2 + capabilities2_runner_bt/package.xml | 1 + .../CMakeLists.txt | 2 + capabilities2_runner_fabric/CMakeLists.txt | 2 + capabilities2_runner_nav2/CMakeLists.txt | 2 + capabilities2_runner_prompt/CMakeLists.txt | 2 + capabilities2_runner_system/CMakeLists.txt | 2 + .../input_multiplex_all_runner.hpp | 24 +++--- .../input_multiplex_any_runner.hpp | 37 +++++---- .../multiplex_base_runner.hpp | 81 ++++++------------- capabilities2_runner_system/package.xml | 1 + capabilities2_server/CMakeLists.txt | 3 + .../capabilities2_server/capabilities_api.hpp | 4 +- .../capabilities_server.hpp | 13 ++- .../capabilities2_server/runner_cache.hpp | 2 +- capabilities2_server/package.xml | 1 + .../capabilities2_utils/connection.hpp | 1 + .../capabilities2_utils/event_types.hpp | 47 +++++++++++ 27 files changed, 209 insertions(+), 182 deletions(-) create mode 100644 capabilities2_utils/include/capabilities2_utils/event_types.hpp diff --git a/capabilities2_msgs/srv/ConfigureCapability.srv b/capabilities2_msgs/srv/ConfigureCapability.srv index 726dab7..b9e6905 100644 --- a/capabilities2_msgs/srv/ConfigureCapability.srv +++ b/capabilities2_msgs/srv/ConfigureCapability.srv @@ -4,4 +4,5 @@ Capability target_on_stop Capability target_on_success Capability target_on_failure string connection_description +int32 trigger_id --- \ No newline at end of file diff --git a/capabilities2_runner/CMakeLists.txt b/capabilities2_runner/CMakeLists.txt index 74610f5..6021b30 100644 --- a/capabilities2_runner/CMakeLists.txt +++ b/capabilities2_runner/CMakeLists.txt @@ -15,6 +15,7 @@ find_package(rclcpp REQUIRED) find_package(rclcpp_action REQUIRED) find_package(pluginlib REQUIRED) find_package(capabilities2_msgs REQUIRED) +find_package(capabilities2_utils REQUIRED) find_package(event_logger REQUIRED) find_package(event_logger_msgs REQUIRED) find_package(tinyxml2_vendor REQUIRED) @@ -33,6 +34,7 @@ ament_target_dependencies(${PROJECT_NAME} rclcpp_action pluginlib capabilities2_msgs + capabilities2_utils event_logger event_logger_msgs TinyXML2 diff --git a/capabilities2_runner/include/capabilities2_runner/action_runner.hpp b/capabilities2_runner/include/capabilities2_runner/action_runner.hpp index f4c88c8..de2786a 100644 --- a/capabilities2_runner/include/capabilities2_runner/action_runner.hpp +++ b/capabilities2_runner/include/capabilities2_runner/action_runner.hpp @@ -91,12 +91,11 @@ class ActionRunner : public RunnerBase } // Trigger on_stopped event if defined - if (events[execute_id].on_stopped.interface != "") + if (events[runner_id].on_stopped.interface != "") { - event_(EventType::STOPPED, -1, events[execute_id].on_stopped.interface, - events[execute_id].on_stopped.provider); - triggerFunction_(events[execute_id].on_stopped.interface, - update_on_stopped(events[execute_id].on_stopped.parameters)); + event_(EventType::STOPPED, -1, events[runner_id].on_stopped.interface, events[runner_id].on_stopped.provider); + triggerFunction_(events[runner_id].on_stopped.interface, + update_on_stopped(events[runner_id].on_stopped.parameters)); } }); @@ -129,8 +128,6 @@ class ActionRunner : public RunnerBase */ virtual void execution(int id) override { - execute_id += 1; - // if parameters are not provided then cannot proceed if (!parameters_[id]) throw runner_exception("cannot trigger action without parameters"); @@ -138,7 +135,7 @@ class ActionRunner : public RunnerBase // generate a goal from parameters if provided goal_msg_ = generate_goal(parameters_[id], id); - info_("goal generated", id); + info_("goal generated for event ", id); std::unique_lock lock(mutex_); completed_ = false; @@ -151,12 +148,10 @@ class ActionRunner : public RunnerBase info_("goal accepted. Waiting for result", id); // trigger the events related to on_started state - if (events[execute_id].on_started.interface != "") + if (events[id].on_started.interface != "") { - event_(EventType::STARTED, id, events[execute_id].on_started.interface, - events[execute_id].on_started.provider); - triggerFunction_(events[execute_id].on_started.interface, - update_on_started(events[execute_id].on_started.parameters)); + event_(EventType::STARTED, id, events[id].on_started.interface, events[id].on_started.provider); + triggerFunction_(events[id].on_started.interface, update_on_started(events[id].on_started.parameters)); } } else @@ -187,12 +182,10 @@ class ActionRunner : public RunnerBase info_("action succeeded.", id); // trigger the events related to on_success state - if (events[execute_id].on_success.interface != "") + if (events[id].on_success.interface != "") { - event_(EventType::SUCCEEDED, id, events[execute_id].on_success.interface, - events[execute_id].on_success.provider); - triggerFunction_(events[execute_id].on_success.interface, - update_on_success(events[execute_id].on_success.parameters)); + event_(EventType::SUCCEEDED, id, events[id].on_success.interface, events[id].on_success.provider); + triggerFunction_(events[id].on_success.interface, update_on_success(events[id].on_success.parameters)); } } else @@ -200,12 +193,10 @@ class ActionRunner : public RunnerBase error_("action failed", id); // trigger the events related to on_failure state - if (events[execute_id].on_failure.interface != "") + if (events[id].on_failure.interface != "") { - event_(EventType::FAILED, id, events[execute_id].on_failure.interface, - events[execute_id].on_failure.provider); - triggerFunction_(events[execute_id].on_failure.interface, - update_on_failure(events[execute_id].on_failure.parameters)); + event_(EventType::FAILED, id, events[id].on_failure.interface, events[id].on_failure.provider); + triggerFunction_(events[id].on_failure.interface, update_on_failure(events[id].on_failure.parameters)); } } diff --git a/capabilities2_runner/include/capabilities2_runner/runner_base.hpp b/capabilities2_runner/include/capabilities2_runner/runner_base.hpp index a9115bf..65d8659 100644 --- a/capabilities2_runner/include/capabilities2_runner/runner_base.hpp +++ b/capabilities2_runner/include/capabilities2_runner/runner_base.hpp @@ -7,9 +7,11 @@ #include #include #include + +#include + #include #include -#include #include namespace capabilities2_runner @@ -68,7 +70,7 @@ class RunnerBase { public: using Event = event_logger_msgs::msg::Event; - using EventType = event_logger::event_t; + using EventType = capabilities2::event_t; RunnerBase() : run_config_() { @@ -106,15 +108,18 @@ class RunnerBase */ virtual void trigger(const std::string& parameters) { - info_("received new parameters", thread_id); + // extract the unique id for the runner and use that as the thread id + tinyxml2::XMLElement * element = nullptr; + element = convert_to_xml(parameters); + element->QueryIntAttribute("id", &runner_id); - parameters_[thread_id] = convert_to_xml(parameters); + parameters_[runner_id] = element; - executionThreadPool[thread_id] = std::thread(&RunnerBase::execution, this, thread_id); + info_("received new parameters with event id", runner_id); - info_("started execution", thread_id); + executionThreadPool[runner_id] = std::thread(&RunnerBase::execution, this, runner_id); - thread_id += 1; + info_("started execution", runner_id); } /** @@ -129,10 +134,8 @@ class RunnerBase node_ = node; run_config_ = run_config; - insert_id = 0; - execute_id = -1; - thread_id = 0; current_inputs_ = 0; + runner_id = 0; event_client_ = std::make_shared(node_, "runner", "/events"); } @@ -145,17 +148,14 @@ class RunnerBase * * @return number of attached events */ - virtual int attach_events(event_logger::event_opts& event_option, + virtual void attach_events(capabilities2::event_opts& event_option, std::function triggerFunction) { - info_("accepted event options with ID : " + std::to_string(insert_id)); + info_("accepted event options with ID : " + std::to_string(event_option.event_id)); triggerFunction_ = triggerFunction; - events[insert_id] = event_option; - insert_id += 1; - - return insert_id; + events[event_option.event_id] = event_option; } /** @@ -213,8 +213,9 @@ class RunnerBase * @brief Trigger process to be executed. * * This method utilizes paramters set via the trigger() function - * - * @param parameters pointer to tinyxml2::XMLElement that contains parameters + * + * @param id unique identifier for the runner id. used to track the correct + * triggers and subsequent events. * */ virtual void execution(int id) = 0; @@ -571,22 +572,12 @@ class RunnerBase /** * @brief dictionary of events */ - std::map events; - - /** - * @brief Last event tracker id to be inserted - */ - int insert_id; - - /** - * @brief Last parameter tracker id to be executed - */ - int execute_id; + std::map events; /** - * @brief Last parameter tracker id to be executed + * @brief unique id for the runner */ - int thread_id; + int runner_id; /** * @brief curent number of trigger signals received diff --git a/capabilities2_runner/include/capabilities2_runner/service_runner.hpp b/capabilities2_runner/include/capabilities2_runner/service_runner.hpp index d0017ef..5238c0b 100644 --- a/capabilities2_runner/include/capabilities2_runner/service_runner.hpp +++ b/capabilities2_runner/include/capabilities2_runner/service_runner.hpp @@ -61,8 +61,6 @@ class ServiceRunner : public RunnerBase */ virtual void execution(int id) override { - execute_id += 1; - // if parameters are not provided then cannot proceed if (!parameters_[id]) throw runner_exception("cannot trigger service without parameters"); @@ -70,7 +68,7 @@ class ServiceRunner : public RunnerBase // generate a goal from parameters if provided auto request_msg = std::make_shared(generate_request(parameters_[id], id)); - info_("request generated", id); + info_("request generated for event :", id); std::unique_lock lock(mutex_); completed_ = false; @@ -82,12 +80,10 @@ class ServiceRunner : public RunnerBase error_("get result call failed"); // trigger the events related to on_failure state - if (events[execute_id].on_failure.interface != "") + if (events[id].on_failure.interface != "") { - event_(EventType::FAILED, id, events[execute_id].on_failure.interface, - events[execute_id].on_failure.provider); - triggerFunction_(events[execute_id].on_failure.interface, - update_on_failure(events[execute_id].on_failure.parameters)); + event_(EventType::FAILED, id, events[id].on_failure.interface, events[id].on_failure.provider); + triggerFunction_(events[id].on_failure.interface, update_on_failure(events[id].on_failure.parameters)); } } else @@ -98,12 +94,10 @@ class ServiceRunner : public RunnerBase process_response(response_, id); // trigger the events related to on_success state - if (events[execute_id].on_success.interface != "") + if (events[id].on_success.interface != "") { - event_(EventType::SUCCEEDED, id, events[execute_id].on_success.interface, - events[execute_id].on_success.provider); - triggerFunction_(events[execute_id].on_success.interface, - update_on_success(events[execute_id].on_success.parameters)); + event_(EventType::SUCCEEDED, id, events[id].on_success.interface, events[id].on_success.provider); + triggerFunction_(events[id].on_success.interface, update_on_success(events[id].on_success.parameters)); } } @@ -112,11 +106,10 @@ class ServiceRunner : public RunnerBase }); // trigger the events related to on_started state - if (events[execute_id].on_started.interface != "") + if (events[id].on_started.interface != "") { - event_(EventType::STARTED, id, events[execute_id].on_started.interface, events[execute_id].on_started.provider); - triggerFunction_(events[execute_id].on_started.interface, - update_on_started(events[execute_id].on_started.parameters)); + event_(EventType::STARTED, id, events[id].on_started.interface, events[id].on_started.provider); + triggerFunction_(events[id].on_started.interface, update_on_started(events[id].on_started.parameters)); } // Conditional wait @@ -143,11 +136,11 @@ class ServiceRunner : public RunnerBase throw runner_exception("cannot stop runner action that was not started"); // Trigger on_stopped event if defined - if (events[execute_id].on_stopped.interface != "") + if (events[runner_id].on_stopped.interface != "") { - event_(EventType::STOPPED, -1, events[execute_id].on_stopped.interface, events[execute_id].on_stopped.provider); - triggerFunction_(events[execute_id].on_stopped.interface, - update_on_stopped(events[execute_id].on_stopped.parameters)); + event_(EventType::STOPPED, -1, events[runner_id].on_stopped.interface, events[runner_id].on_stopped.provider); + triggerFunction_(events[runner_id].on_stopped.interface, + update_on_stopped(events[runner_id].on_stopped.parameters)); } info_("stopping runner"); diff --git a/capabilities2_runner/include/capabilities2_runner/topic_runner.hpp b/capabilities2_runner/include/capabilities2_runner/topic_runner.hpp index f2a6893..0fb84ce 100644 --- a/capabilities2_runner/include/capabilities2_runner/topic_runner.hpp +++ b/capabilities2_runner/include/capabilities2_runner/topic_runner.hpp @@ -51,18 +51,15 @@ class TopicRunner : public RunnerBase */ virtual void execution(int id) override { - execute_id += 1; - // if parameters are not provided then cannot proceed if (!parameters_[id]) throw runner_exception("cannot grab data without parameters"); // trigger the events related to on_started state - if (events[execute_id].on_started.interface != "") + if (events[id].on_started.interface != "") { - event_(EventType::STARTED, id, events[execute_id].on_started.interface, events[execute_id].on_started.provider); - triggerFunction_(events[execute_id].on_started.interface, - update_on_started(events[execute_id].on_started.parameters)); + event_(EventType::STARTED, id, events[id].on_started.interface, events[id].on_started.provider); + triggerFunction_(events[id].on_started.interface, update_on_started(events[id].on_started.parameters)); } std::unique_lock lock(mutex_); @@ -77,12 +74,10 @@ class TopicRunner : public RunnerBase if (latest_message_) { // trigger the events related to on_success state - if (events[execute_id].on_success.interface != "") + if (events[id].on_success.interface != "") { - event_(EventType::SUCCEEDED, id, events[execute_id].on_success.interface, - events[execute_id].on_success.provider); - triggerFunction_(events[execute_id].on_success.interface, - update_on_success(events[execute_id].on_success.parameters)); + event_(EventType::SUCCEEDED, id, events[id].on_success.interface, events[id].on_success.provider); + triggerFunction_(events[id].on_success.interface, update_on_success(events[id].on_success.parameters)); } } else @@ -90,11 +85,10 @@ class TopicRunner : public RunnerBase error_("Message receving failed."); // trigger the events related to on_failure state - if (events[execute_id].on_failure.interface != "") + if (events[id].on_failure.interface != "") { - event_(EventType::FAILED, id, events[execute_id].on_failure.interface, events[execute_id].on_failure.provider); - triggerFunction_(events[execute_id].on_failure.interface, - update_on_failure(events[execute_id].on_failure.parameters)); + event_(EventType::FAILED, id, events[id].on_failure.interface, events[id].on_failure.provider); + triggerFunction_(events[id].on_failure.interface, update_on_failure(events[id].on_failure.parameters)); } } @@ -120,11 +114,11 @@ class TopicRunner : public RunnerBase throw runner_exception("cannot stop runner subscriber that was not started"); // Trigger on_stopped event if defined - if (events[execute_id].on_stopped.interface != "") + if (events[runner_id].on_stopped.interface != "") { - event_(EventType::STOPPED, -1, events[execute_id].on_stopped.interface, events[execute_id].on_stopped.provider); - triggerFunction_(events[execute_id].on_stopped.interface, - update_on_stopped(events[execute_id].on_stopped.parameters)); + event_(EventType::STOPPED, -1, events[runner_id].on_stopped.interface, events[runner_id].on_stopped.provider); + triggerFunction_(events[runner_id].on_stopped.interface, + update_on_stopped(events[runner_id].on_stopped.parameters)); } info_("stopping runner"); diff --git a/capabilities2_runner/package.xml b/capabilities2_runner/package.xml index 248120c..f33f924 100644 --- a/capabilities2_runner/package.xml +++ b/capabilities2_runner/package.xml @@ -21,6 +21,7 @@ pluginlib std_msgs capabilities2_msgs + capabilities2_utils event_logger event_logger_msgs tinyxml2_vendor diff --git a/capabilities2_runner_audio/CMakeLists.txt b/capabilities2_runner_audio/CMakeLists.txt index 14c53d0..c03dd00 100644 --- a/capabilities2_runner_audio/CMakeLists.txt +++ b/capabilities2_runner_audio/CMakeLists.txt @@ -18,6 +18,7 @@ find_package(pluginlib REQUIRED) find_package(hri_audio_msgs REQUIRED) find_package(capabilities2_msgs REQUIRED) find_package(capabilities2_runner REQUIRED) +find_package(capabilities2_utils REQUIRED) find_package(event_logger REQUIRED) find_package(event_logger_msgs REQUIRED) find_package(tinyxml2_vendor REQUIRED) @@ -38,6 +39,7 @@ ament_target_dependencies(${PROJECT_NAME} hri_audio_msgs capabilities2_msgs capabilities2_runner + capabilities2_utils event_logger event_logger_msgs TinyXML2 diff --git a/capabilities2_runner_audio/package.xml b/capabilities2_runner_audio/package.xml index 787289b..edccb72 100644 --- a/capabilities2_runner_audio/package.xml +++ b/capabilities2_runner_audio/package.xml @@ -21,6 +21,7 @@ hri_audio_msgs capabilities2_msgs capabilities2_runner + capabilities2_utils event_logger event_logger_msgs tinyxml2_vendor diff --git a/capabilities2_runner_bt/CMakeLists.txt b/capabilities2_runner_bt/CMakeLists.txt index 1052e22..25bace3 100644 --- a/capabilities2_runner_bt/CMakeLists.txt +++ b/capabilities2_runner_bt/CMakeLists.txt @@ -16,6 +16,7 @@ find_package(rclcpp_action REQUIRED) find_package(pluginlib REQUIRED) find_package(capabilities2_msgs REQUIRED) find_package(capabilities2_runner REQUIRED) +find_package(capabilities2_utils REQUIRED) find_package(event_logger REQUIRED) find_package(event_logger_msgs REQUIRED) find_package(behaviortree_cpp REQUIRED) @@ -36,6 +37,7 @@ ament_target_dependencies(${PROJECT_NAME} pluginlib capabilities2_msgs capabilities2_runner + capabilities2_utils event_logger event_logger_msgs behaviortree_cpp diff --git a/capabilities2_runner_bt/package.xml b/capabilities2_runner_bt/package.xml index d41e1c7..07668d9 100644 --- a/capabilities2_runner_bt/package.xml +++ b/capabilities2_runner_bt/package.xml @@ -21,6 +21,7 @@ std_msgs capabilities2_msgs capabilities2_runner + capabilities2_utils event_logger event_logger_msgs tinyxml2_vendor diff --git a/capabilities2_runner_capabilities/CMakeLists.txt b/capabilities2_runner_capabilities/CMakeLists.txt index 57153c2..6dc43ee 100644 --- a/capabilities2_runner_capabilities/CMakeLists.txt +++ b/capabilities2_runner_capabilities/CMakeLists.txt @@ -16,6 +16,7 @@ find_package(rclcpp_action REQUIRED) find_package(pluginlib REQUIRED) find_package(capabilities2_msgs REQUIRED) find_package(capabilities2_runner REQUIRED) +find_package(capabilities2_utils REQUIRED) find_package(event_logger REQUIRED) find_package(event_logger_msgs REQUIRED) find_package(tinyxml2_vendor REQUIRED) @@ -35,6 +36,7 @@ ament_target_dependencies(${PROJECT_NAME} pluginlib capabilities2_msgs capabilities2_runner + capabilities2_utils event_logger event_logger_msgs TinyXML2 diff --git a/capabilities2_runner_fabric/CMakeLists.txt b/capabilities2_runner_fabric/CMakeLists.txt index 15dc76e..b813db7 100644 --- a/capabilities2_runner_fabric/CMakeLists.txt +++ b/capabilities2_runner_fabric/CMakeLists.txt @@ -17,6 +17,7 @@ find_package(pluginlib REQUIRED) find_package(fabric_msgs REQUIRED) find_package(capabilities2_msgs REQUIRED) find_package(capabilities2_runner REQUIRED) +find_package(capabilities2_utils REQUIRED) find_package(event_logger REQUIRED) find_package(event_logger_msgs REQUIRED) find_package(tinyxml2_vendor REQUIRED) @@ -37,6 +38,7 @@ ament_target_dependencies(${PROJECT_NAME} fabric_msgs capabilities2_msgs capabilities2_runner + capabilities2_utils event_logger event_logger_msgs TinyXML2 diff --git a/capabilities2_runner_nav2/CMakeLists.txt b/capabilities2_runner_nav2/CMakeLists.txt index 61d91a3..01483a7 100644 --- a/capabilities2_runner_nav2/CMakeLists.txt +++ b/capabilities2_runner_nav2/CMakeLists.txt @@ -20,6 +20,7 @@ find_package(tf2_ros REQUIRED) find_package(geometry_msgs REQUIRED) find_package(capabilities2_msgs REQUIRED) find_package(capabilities2_runner REQUIRED) +find_package(capabilities2_utils REQUIRED) find_package(event_logger REQUIRED) find_package(event_logger_msgs REQUIRED) find_package(tinyxml2_vendor REQUIRED) @@ -43,6 +44,7 @@ ament_target_dependencies(${PROJECT_NAME} geometry_msgs capabilities2_msgs capabilities2_runner + capabilities2_utils event_logger event_logger_msgs TinyXML2 diff --git a/capabilities2_runner_prompt/CMakeLists.txt b/capabilities2_runner_prompt/CMakeLists.txt index b0a509c..8df7df5 100644 --- a/capabilities2_runner_prompt/CMakeLists.txt +++ b/capabilities2_runner_prompt/CMakeLists.txt @@ -17,6 +17,7 @@ find_package(pluginlib REQUIRED) find_package(prompt_msgs REQUIRED) find_package(capabilities2_msgs REQUIRED) find_package(capabilities2_runner REQUIRED) +find_package(capabilities2_utils REQUIRED) find_package(event_logger REQUIRED) find_package(event_logger_msgs REQUIRED) find_package(tinyxml2_vendor REQUIRED) @@ -37,6 +38,7 @@ ament_target_dependencies(${PROJECT_NAME} prompt_msgs capabilities2_msgs capabilities2_runner + capabilities2_utils event_logger event_logger_msgs TinyXML2 diff --git a/capabilities2_runner_system/CMakeLists.txt b/capabilities2_runner_system/CMakeLists.txt index 6f4d9c3..9d56357 100644 --- a/capabilities2_runner_system/CMakeLists.txt +++ b/capabilities2_runner_system/CMakeLists.txt @@ -16,6 +16,7 @@ find_package(rclcpp_action REQUIRED) find_package(pluginlib REQUIRED) find_package(capabilities2_msgs REQUIRED) find_package(capabilities2_runner REQUIRED) +find_package(capabilities2_utils REQUIRED) find_package(fabric_msgs REQUIRED) find_package(event_logger REQUIRED) find_package(event_logger_msgs REQUIRED) @@ -37,6 +38,7 @@ ament_target_dependencies(${PROJECT_NAME} capabilities2_msgs fabric_msgs capabilities2_runner + capabilities2_utils event_logger event_logger_msgs TinyXML2 diff --git a/capabilities2_runner_system/include/capabilities2_runner_system/input_multiplex_all_runner.hpp b/capabilities2_runner_system/include/capabilities2_runner_system/input_multiplex_all_runner.hpp index 6def71a..5bc8cac 100644 --- a/capabilities2_runner_system/include/capabilities2_runner_system/input_multiplex_all_runner.hpp +++ b/capabilities2_runner_system/include/capabilities2_runner_system/input_multiplex_all_runner.hpp @@ -23,35 +23,33 @@ class InputMultiplexAllRunner : public MultiplexBaseRunner { tinyxml2::XMLElement* parameters_ = convert_to_xml(parameters); - int uid = 0; int input_count = 0; parameters_->QueryIntAttribute("input_count", &input_count); - parameters_->QueryIntAttribute("uid", &uid); + parameters_->QueryIntAttribute("id", &runner_id); - if (input_count_tracker.find(uid) == input_count_tracker.end()) + if (input_count_tracker.find(runner_id) == input_count_tracker.end()) { - input_count_tracker[uid] = 1; - expected_input_count[uid] = input_count; + input_count_tracker[runner_id] = 1; + expected_input_count[runner_id] = input_count; - info_("has started the All condition with " + std::to_string(input_count_tracker[uid]) + " inputs."); + info_("has started the All condition with " + std::to_string(input_count_tracker[runner_id]) + " inputs."); } else { - input_count_tracker[uid] += 1; + input_count_tracker[runner_id] += 1; - info_("has received " + std::to_string(input_count_tracker[uid]) + "/" + - std::to_string(expected_input_count[uid]) + " inputs for ALL condition."); + info_("has received " + std::to_string(input_count_tracker[runner_id]) + "/" + + std::to_string(expected_input_count[runner_id]) + " inputs for ALL condition."); } - if (input_count_tracker[uid] == expected_input_count[uid]) + if (input_count_tracker[runner_id] == expected_input_count[runner_id]) { - info_("has fullfilled the All condition with " + std::to_string(input_count_tracker[uid]) + " inputs."); + info_("has fullfilled the All condition with " + std::to_string(input_count_tracker[runner_id]) + " inputs."); - executionThreadPool[uid] = std::thread(&InputMultiplexAllRunner::execution, this, uid); + executionThreadPool[runner_id] = std::thread(&InputMultiplexAllRunner::execution, this, runner_id); } } - }; } // namespace capabilities2_runner diff --git a/capabilities2_runner_system/include/capabilities2_runner_system/input_multiplex_any_runner.hpp b/capabilities2_runner_system/include/capabilities2_runner_system/input_multiplex_any_runner.hpp index 5960a77..dfc919f 100644 --- a/capabilities2_runner_system/include/capabilities2_runner_system/input_multiplex_any_runner.hpp +++ b/capabilities2_runner_system/include/capabilities2_runner_system/input_multiplex_any_runner.hpp @@ -21,37 +21,46 @@ class InputMultiplexAnyRunner : public MultiplexBaseRunner */ virtual void trigger(const std::string& parameters) override { + info_("received new parameters for InputMultiplexAnyRunner : " + parameters); + tinyxml2::XMLElement* parameters_ = convert_to_xml(parameters); - int uid = 0; - int input_count = 0; + int input_count = -1; parameters_->QueryIntAttribute("input_count", &input_count); - parameters_->QueryIntAttribute("uid", &uid); + parameters_->QueryIntAttribute("id", &runner_id); + + if (runner_id < 0 || input_count < 0) + { + throw runner_exception("UID or input_count not found in parameters"); + } + else + { + info_("triggered with UID: " + std::to_string(runner_id) + " and input_count: " + std::to_string(input_count)); + } - if (input_count_tracker.find(uid) == input_count_tracker.end()) + if (input_count_tracker.find(runner_id) == input_count_tracker.end()) { - input_count_tracker[uid] = 1; - expected_input_count[uid] = input_count; + input_count_tracker[runner_id] = 1; + expected_input_count[runner_id] = input_count; - info_("has started the ANY condition with " + std::to_string(input_count_tracker[uid]) + " inputs."); + info_("has started the ANY condition with " + std::to_string(input_count_tracker[runner_id]) + " inputs."); } else { - input_count_tracker[uid] += 1; + input_count_tracker[runner_id] += 1; - info_("has received " + std::to_string(input_count_tracker[uid]) + "/" + - std::to_string(expected_input_count[uid]) + " inputs for ANY condition."); + info_("has received " + std::to_string(input_count_tracker[runner_id]) + "/" + + std::to_string(expected_input_count[runner_id]) + " inputs for ANY condition."); } - if (input_count_tracker[uid] > 0) + if (input_count_tracker[runner_id] > 0) { - info_("has fullfilled the ANY condition with " + std::to_string(input_count_tracker[uid]) + " inputs."); + info_("has fullfilled the ANY condition with " + std::to_string(input_count_tracker[runner_id]) + " inputs."); - executionThreadPool[uid] = std::thread(&InputMultiplexAnyRunner::execution, this, uid); + executionThreadPool[runner_id] = std::thread(&InputMultiplexAnyRunner::execution, this, runner_id); } } - }; } // namespace capabilities2_runner diff --git a/capabilities2_runner_system/include/capabilities2_runner_system/multiplex_base_runner.hpp b/capabilities2_runner_system/include/capabilities2_runner_system/multiplex_base_runner.hpp index c02739d..10d2c61 100644 --- a/capabilities2_runner_system/include/capabilities2_runner_system/multiplex_base_runner.hpp +++ b/capabilities2_runner_system/include/capabilities2_runner_system/multiplex_base_runner.hpp @@ -31,70 +31,38 @@ class MultiplexBaseRunner : public RunnerBase /** * @brief Trigger process to be executed. * - * @param uid unique identifier for the execution + * @param id unique identifier for the execution */ - virtual void execution(int uid) + virtual void execution(int id) { - info_("execution started for uid: " + std::to_string(uid)); + info_("execution started for id: " + std::to_string(id)); - // trigger the events related to on_success state - if (events[uid].on_success.interface != "") + // check if the id is already completed + if (completed_executions.find(id) != completed_executions.end() && completed_executions[id]) { - event_(EventType::SUCCEEDED, uid, events[uid].on_success.interface, events[uid].on_success.provider); - triggerFunction_(events[uid].on_success.interface, update_on_success(events[uid].on_success.parameters)); + info_("execution already completed for id: " + std::to_string(id)); + return; } - // trigger the events related to on_failure state - else if (events[uid].on_failure.interface != "") + else { - event_(EventType::FAILED, uid, events[uid].on_failure.interface, events[uid].on_failure.provider); - triggerFunction_(events[uid].on_failure.interface, update_on_failure(events[uid].on_failure.parameters)); + // trigger the events related to on_success state + if (events[id].on_success.interface != "") + { + event_(EventType::SUCCEEDED, id, events[id].on_success.interface, events[id].on_success.provider); + triggerFunction_(events[id].on_success.interface, update_on_success(events[id].on_success.parameters)); + } + // trigger the events related to on_failure state + else if (events[id].on_failure.interface != "") + { + event_(EventType::FAILED, id, events[id].on_failure.interface, events[id].on_failure.provider); + triggerFunction_(events[id].on_failure.interface, update_on_failure(events[id].on_failure.parameters)); + } } - } - - /** - * @brief attach events to the runner - * - * @param event_option event_options related for the action - * @param triggerFunction external function that triggers capability runners - * - * @return number of attached events - */ - virtual int attach_events(event_logger::event_opts& event_option, - std::function triggerFunction) override - { - info_("accepted event options with ID : " + std::to_string(insert_id)); - - triggerFunction_ = triggerFunction; - - tinyxml2::XMLElement* on_success_params = convert_to_xml(event_option.on_success.parameters); - - int uid = NULL; - // extract the uid from the event options from whatever runner is present by looping - if (event_option.on_success.interface != "") - { - tinyxml2::XMLElement* on_success_params = convert_to_xml(event_option.on_success.parameters); - on_success_params->QueryIntAttribute("uid", &uid); - } - else if (event_option.on_failure.interface != "") - { - tinyxml2::XMLElement* on_failure_params = convert_to_xml(event_option.on_failure.parameters); - on_failure_params->QueryIntAttribute("uid", &uid); - } - else if (event_option.on_started.interface != "") - { - tinyxml2::XMLElement* on_started_params = convert_to_xml(event_option.on_started.parameters); - on_started_params->QueryIntAttribute("uid", &uid); - } - else if (event_option.on_stopped.interface != "") - { - tinyxml2::XMLElement* on_stopped_params = convert_to_xml(event_option.on_stopped.parameters); - on_stopped_params->QueryIntAttribute("uid", &uid); - } + // track the execution as completed + completed_executions[id] = true; - events[uid] = event_option; - - return uid; + info_("multiplexing complete. Thread closing.", id); } /** @@ -118,5 +86,8 @@ class MultiplexBaseRunner : public RunnerBase // expected input count std::map expected_input_count; + + // completed executions + std::map completed_executions; }; } // namespace capabilities2_runner \ No newline at end of file diff --git a/capabilities2_runner_system/package.xml b/capabilities2_runner_system/package.xml index 2f3f807..dd79d51 100644 --- a/capabilities2_runner_system/package.xml +++ b/capabilities2_runner_system/package.xml @@ -24,6 +24,7 @@ std_msgs capabilities2_msgs capabilities2_runner + capabilities2_utils fabric_msgs event_logger event_logger_msgs diff --git a/capabilities2_server/CMakeLists.txt b/capabilities2_server/CMakeLists.txt index 0088818..bc9f429 100644 --- a/capabilities2_server/CMakeLists.txt +++ b/capabilities2_server/CMakeLists.txt @@ -19,6 +19,7 @@ find_package(bondcpp REQUIRED) find_package(pluginlib REQUIRED) find_package(capabilities2_msgs REQUIRED) find_package(capabilities2_runner REQUIRED) +find_package(capabilities2_utils REQUIRED) find_package(event_logger REQUIRED) find_package(event_logger_msgs REQUIRED) find_package(tinyxml2_vendor REQUIRED) @@ -62,6 +63,7 @@ ament_target_dependencies(${PROJECT_NAME}_comp rclcpp_components capabilities2_msgs capabilities2_runner + capabilities2_utils event_logger event_logger_msgs TinyXML2 @@ -105,6 +107,7 @@ ament_target_dependencies(${PROJECT_NAME}_node pluginlib capabilities2_msgs capabilities2_runner + capabilities2_utils event_logger event_logger_msgs TinyXML2 diff --git a/capabilities2_server/include/capabilities2_server/capabilities_api.hpp b/capabilities2_server/include/capabilities2_server/capabilities_api.hpp index 97b8b24..e7a246a 100644 --- a/capabilities2_server/include/capabilities2_server/capabilities_api.hpp +++ b/capabilities2_server/include/capabilities2_server/capabilities_api.hpp @@ -16,8 +16,8 @@ #include #include #include +#include -#include #include #include @@ -241,7 +241,7 @@ class CapabilitiesAPI * @param capability capability from where the events originate * @param event_options event options for the capability */ - void set_triggers(const std::string& capability, event_logger::event_opts& event_options) + void set_triggers(const std::string& capability, capabilities2::event_opts& event_options) { try { diff --git a/capabilities2_server/include/capabilities2_server/capabilities_server.hpp b/capabilities2_server/include/capabilities2_server/capabilities_server.hpp index 32528b6..e5e5b30 100644 --- a/capabilities2_server/include/capabilities2_server/capabilities_server.hpp +++ b/capabilities2_server/include/capabilities2_server/capabilities_server.hpp @@ -16,7 +16,6 @@ #include #include - #include #include #include @@ -33,8 +32,9 @@ #include #include +#include + #include -#include #include namespace capabilities2_server @@ -318,7 +318,9 @@ class CapabilitiesServer : public rclcpp::Node, public CapabilitiesAPI void configure_capability_cb(const std::shared_ptr req, std::shared_ptr res) { - event_logger::event_opts event_options; + capabilities2::event_opts event_options; + + event_options.event_id = req->trigger_id; event_options.on_started.interface = req->target_on_start.capability; event_options.on_started.provider = req->target_on_start.provider; @@ -348,6 +350,11 @@ class CapabilitiesServer : public rclcpp::Node, public CapabilitiesAPI event_->runner_define(req->source.capability, req->source.provider, req->target_on_stop.capability, req->target_on_stop.provider, event_logger_msgs::msg::Event::STOPPED, req->connection_description); + event_->info("on_started : " + event_options.on_started.parameters); + event_->info("on_failure : " + event_options.on_failure.parameters); + event_->info("on_success : " + event_options.on_success.parameters); + event_->info("on_stopped : " + event_options.on_stopped.parameters); + // setup triggers between parameters set_triggers(req->source.capability, event_options); diff --git a/capabilities2_server/include/capabilities2_server/runner_cache.hpp b/capabilities2_server/include/capabilities2_server/runner_cache.hpp index 4c1aac1..db7c9fe 100644 --- a/capabilities2_server/include/capabilities2_server/runner_cache.hpp +++ b/capabilities2_server/include/capabilities2_server/runner_cache.hpp @@ -112,7 +112,7 @@ class RunnerCache * @param on_success on_success event with capability and parameters * @param on_stopped on_stop event with capability and parameters */ - void set_runner_triggers(const std::string& capability, event_logger::event_opts& event_options) + void set_runner_triggers(const std::string& capability, capabilities2::event_opts& event_options) { runner_cache_[capability]->attach_events(event_options, std::bind(&capabilities2_server::RunnerCache::trigger_runner, this, diff --git a/capabilities2_server/package.xml b/capabilities2_server/package.xml index 0a8ab8a..3efffc8 100644 --- a/capabilities2_server/package.xml +++ b/capabilities2_server/package.xml @@ -23,6 +23,7 @@ rclcpp_components capabilities2_msgs capabilities2_runner + capabilities2_utils event_logger event_logger_msgs tinyxml2_vendor diff --git a/capabilities2_utils/include/capabilities2_utils/connection.hpp b/capabilities2_utils/include/capabilities2_utils/connection.hpp index c88ede3..4d8acca 100644 --- a/capabilities2_utils/include/capabilities2_utils/connection.hpp +++ b/capabilities2_utils/include/capabilities2_utils/connection.hpp @@ -26,6 +26,7 @@ namespace capabilities2 connection_t target_on_success; connection_t target_on_failure; std::string connection_description; + int trigger_id = -1; }; } // namespace capabilities2 diff --git a/capabilities2_utils/include/capabilities2_utils/event_types.hpp b/capabilities2_utils/include/capabilities2_utils/event_types.hpp new file mode 100644 index 0000000..9f756ef --- /dev/null +++ b/capabilities2_utils/include/capabilities2_utils/event_types.hpp @@ -0,0 +1,47 @@ +#pragma once +#include + +namespace capabilities2 +{ +enum event_t +{ + IDLE, + STARTED, + STOPPED, + FAILED, + SUCCEEDED +}; + +/** + * @brief event definition + * + * Contains the informations about the event to be executed. It contains the interface, provider and parameters + */ +struct event +{ + std::string interface; + std::string provider; + std::string parameters; +}; + +/** + * @brief event options + * + * keeps track of events that are related to runner instances at various points of the + * plan + * @param event_id unique identifier for the event + * @param on_started information about the capability to execute on start + * @param on_success information about the capability to execute on success + * @param on_failure information about the capability to execute on failure + * @param on_stopped information about the capability to execute on stop + */ +struct event_opts +{ + int event_id = -1; + event on_started; + event on_success; + event on_failure; + event on_stopped; +}; + +} \ No newline at end of file From 6a448731a0480915d5ed1e174337644aa174bcc9 Mon Sep 17 00:00:00 2001 From: Kalana Ratnayake Date: Sun, 17 Aug 2025 20:46:10 +1000 Subject: [PATCH 5/5] redundant parameter cleaning --- .../robotpose_runner.hpp | 41 ++++++++----------- 1 file changed, 16 insertions(+), 25 deletions(-) diff --git a/capabilities2_runner_nav2/include/capabilities2_runner_nav2/robotpose_runner.hpp b/capabilities2_runner_nav2/include/capabilities2_runner_nav2/robotpose_runner.hpp index 5ce0429..4f7d8bd 100644 --- a/capabilities2_runner_nav2/include/capabilities2_runner_nav2/robotpose_runner.hpp +++ b/capabilities2_runner_nav2/include/capabilities2_runner_nav2/robotpose_runner.hpp @@ -55,18 +55,15 @@ class RobotPoseRunner : public RunnerBase const char* odom; const char* robot; - execute_id += 1; - // if parameters are not provided then cannot proceed if (!parameters_[id]) throw runner_exception("cannot grab data without parameters"); // trigger the events related to on_started state - if (events[execute_id].on_started.interface != "") + if (events[id].on_started.interface != "") { - event_(EventType::STARTED, id, events[execute_id].on_started.interface, events[execute_id].on_started.provider); - triggerFunction_(events[execute_id].on_started.interface, - update_on_started(events[execute_id].on_started.parameters)); + event_(EventType::STARTED, id, events[id].on_started.interface, events[id].on_started.provider); + triggerFunction_(events[id].on_started.interface, update_on_started(events[id].on_started.parameters)); } info_("Waiting for Transformation.", id); @@ -87,12 +84,10 @@ class RobotPoseRunner : public RunnerBase transform_ = tf_buffer_->lookupTransform(mapFrame, robotFrame, tf2::TimePointZero); // trigger the events related to on_success state - if (events[execute_id].on_success.interface != "") + if (events[id].on_success.interface != "") { - event_(EventType::SUCCEEDED, id, events[execute_id].on_success.interface, - events[execute_id].on_success.provider); - triggerFunction_(events[execute_id].on_success.interface, - update_on_success(events[execute_id].on_success.parameters)); + event_(EventType::SUCCEEDED, id, events[id].on_success.interface, events[id].on_success.provider); + triggerFunction_(events[id].on_success.interface, update_on_success(events[id].on_success.parameters)); } info_("Transformation received. Thread closing.", id); @@ -109,12 +104,10 @@ class RobotPoseRunner : public RunnerBase transform_ = tf_buffer_->lookupTransform(odomFrame, robotFrame, tf2::TimePointZero); // trigger the events related to on_success state - if (events[execute_id].on_success.interface != "") + if (events[id].on_success.interface != "") { - event_(EventType::SUCCEEDED, id, events[execute_id].on_success.interface, - events[execute_id].on_success.provider); - triggerFunction_(events[execute_id].on_success.interface, - update_on_success(events[execute_id].on_success.parameters)); + event_(EventType::SUCCEEDED, id, events[id].on_success.interface, events[id].on_success.provider); + triggerFunction_(events[id].on_success.interface, update_on_success(events[id].on_success.parameters)); } info_("Transformation received. Thread closing.", id); @@ -124,11 +117,10 @@ class RobotPoseRunner : public RunnerBase info_("Could not transform from odom to robot: " + std::string(ex.what()), id); // trigger the events related to on_failure state - if (events[execute_id].on_failure.interface != "") + if (events[id].on_failure.interface != "") { - event_(EventType::FAILED, id, events[execute_id].on_failure.interface, events[execute_id].on_failure.provider); - triggerFunction_(events[execute_id].on_failure.interface, - update_on_failure(events[execute_id].on_failure.parameters)); + event_(EventType::FAILED, id, events[id].on_failure.interface, events[id].on_failure.provider); + triggerFunction_(events[id].on_failure.interface, update_on_failure(events[id].on_failure.parameters)); } info_("Transformation not received. Thread closing.", id); @@ -154,12 +146,11 @@ class RobotPoseRunner : public RunnerBase throw runner_exception("cannot stop runner subscriber that was not started"); // Trigger on_stopped event if defined - if (events[execute_id].on_stopped.interface != "") + if (events[runner_id].on_stopped.interface != "") { - event_(EventType::STOPPED, -1, events[execute_id].on_stopped.interface, - events[execute_id].on_stopped.provider); - triggerFunction_(events[execute_id].on_stopped.interface, - update_on_stopped(events[execute_id].on_stopped.parameters)); + event_(EventType::STOPPED, -1, events[runner_id].on_stopped.interface, events[runner_id].on_stopped.provider); + triggerFunction_(events[runner_id].on_stopped.interface, + update_on_stopped(events[runner_id].on_stopped.parameters)); } }