Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ Fixed

* Fix redis SSL problems with sentinel #5660

Added
~~~~~

* Added graceful shutdown for workflow engine. #5463
Contributed by @khushboobhatia01

3.7.0 - May 05, 2022
--------------------
Expand Down
4 changes: 4 additions & 0 deletions conf/st2.conf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ logging = /etc/st2/logging.timersengine.conf
webui_base_url = https://localhost

[workflow_engine]
# How long to wait for process (in seconds) to exit after receiving shutdown signal.
exit_still_active_check = 300
# Max seconds to allow workflow execution be idled before it is identified as orphaned and cancelled by the garbage collector. A value of zero means the feature is disabled. This is disabled by default.
gc_max_idle_sec = 0
# Location of the logging configuration file.
Expand All @@ -373,4 +375,6 @@ retry_max_jitter_msec = 1000
retry_stop_max_msec = 60000
# Interval inbetween retries.
retry_wait_fixed_msec = 1000
# Time interval between subsequent queries to check executions handled by WFE.
still_active_check_interval = 2

5 changes: 2 additions & 3 deletions st2actions/st2actions/cmd/workflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
__all__ = ["main"]

LOG = logging.getLogger(__name__)
WORKFLOW_ENGINE = "workflow_engine"


def setup_sigterm_handler(engine):
Expand All @@ -53,7 +52,7 @@ def sigterm_handler(signum=None, frame=None):
def setup():
capabilities = {"name": "workflowengine", "type": "passive"}
common_setup(
service=WORKFLOW_ENGINE,
service=workflows.WORKFLOW_ENGINE,
config=config,
setup_db=True,
register_mq_exchanges=True,
Expand All @@ -72,7 +71,7 @@ def run_server():
engine.start(wait=True)
except (KeyboardInterrupt, SystemExit):
LOG.info("(PID=%s) Workflow engine stopped.", os.getpid())
deregister_service(service=WORKFLOW_ENGINE)
deregister_service(service=workflows.WORKFLOW_ENGINE)
engine.shutdown()
except:
LOG.exception("(PID=%s) Workflow engine unexpectedly stopped.", os.getpid())
Expand Down
74 changes: 72 additions & 2 deletions st2actions/st2actions/workflows/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,29 @@
# limitations under the License.

from __future__ import absolute_import
from oslo_config import cfg

from orquesta import statuses

from tooz.coordination import GroupNotCreated
from st2common.services import coordination
from eventlet.semaphore import Semaphore
from eventlet import spawn_after
from st2common.constants import action as ac_const
from st2common import log as logging
from st2common.metrics import base as metrics
from st2common.models.db import execution as ex_db_models
from st2common.models.db import workflow as wf_db_models
from st2common.persistence import liveaction as lv_db_access
from st2common.persistence import workflow as wf_db_access
from st2common.persistence import execution as ex_db_access
from st2common.services import action as ac_svc
from st2common.services import policies as pc_svc
from st2common.services import workflows as wf_svc
from st2common.transport import consumers
from st2common.transport import queues
from st2common.transport import utils as txpt_utils

from st2common.util import concurrency
from st2common.util import action_db as action_utils

LOG = logging.getLogger(__name__)

Expand All @@ -40,10 +47,17 @@
queues.WORKFLOW_ACTION_EXECUTION_UPDATE_QUEUE,
]

WORKFLOW_ENGINE = "workflow_engine"
WORKFLOW_ENGINE_START_STOP_SEQ = "workflow_engine_start_stop_seq"


class WorkflowExecutionHandler(consumers.VariableMessageHandler):
def __init__(self, connection, queues):
super(WorkflowExecutionHandler, self).__init__(connection, queues)
self._active_messages = 0
self._semaphore = Semaphore()
# This is required to ensure workflows stuck in pausing state after shutdown transition to paused state after engine startup.
self._delay = 30

def handle_workflow_execution_with_instrumentation(wf_ex_db):
with metrics.CounterWithTimer(key="orquesta.workflow.executions"):
Expand Down Expand Up @@ -78,13 +92,69 @@ def process(self, message):
raise ValueError(msg)

try:
with self._semaphore:
self._active_messages += 1
handler_function(message)
except Exception as e:
# If the exception is caused by DB connection error, then the following
# error handling routine will fail as well because it will try to update
# the database and fail the workflow execution gracefully. In this case,
# the garbage collector will find and cancel these workflow executions.
self.fail_workflow_execution(message, e)
finally:
with self._semaphore:
self._active_messages -= 1

def start(self, wait):
spawn_after(self._delay, self._resume_workflows_paused_during_shutdown)
super(WorkflowExecutionHandler, self).start(wait=wait)

def shutdown(self):
super(WorkflowExecutionHandler, self).shutdown()
exit_timeout = cfg.CONF.workflow_engine.exit_still_active_check
sleep_delay = cfg.CONF.workflow_engine.still_active_check_interval
timeout = 0

while timeout < exit_timeout and self._active_messages > 0:
concurrency.sleep(sleep_delay)
timeout += sleep_delay

coordinator = coordination.get_coordinator()
member_ids = []
with coordinator.get_lock(WORKFLOW_ENGINE_START_STOP_SEQ):
try:
group_id = coordination.get_group_id(WORKFLOW_ENGINE)
member_ids = list(coordinator.get_members(group_id).get())
except GroupNotCreated:
pass

# Check if there are other WFEs in service registry
if cfg.CONF.coordination.service_registry and not member_ids:
ac_ex_dbs = self._get_running_workflows()
for ac_ex_db in ac_ex_dbs:
lv_ac = action_utils.get_liveaction_by_id(ac_ex_db.liveaction["id"])
ac_svc.request_pause(lv_ac, WORKFLOW_ENGINE_START_STOP_SEQ)

def _get_running_workflows(self):
query_filters = {
"runner__name": "orquesta",
"status": ac_const.LIVEACTION_STATUS_RUNNING,
}
return ex_db_access.ActionExecution.query(**query_filters)

def _get_workflows_paused_during_shutdown(self):
query_filters = {
"status": ac_const.LIVEACTION_STATUS_PAUSED,
"context__paused_by": WORKFLOW_ENGINE_START_STOP_SEQ,
}
return lv_db_access.LiveAction.query(**query_filters)

def _resume_workflows_paused_during_shutdown(self):
coordinator = coordination.get_coordinator()
with coordinator.get_lock(WORKFLOW_ENGINE_START_STOP_SEQ):
lv_ac_dbs = self._get_workflows_paused_during_shutdown()
for lv_ac_db in lv_ac_dbs:
ac_svc.request_resume(lv_ac_db, WORKFLOW_ENGINE_START_STOP_SEQ)

def fail_workflow_execution(self, message, exception):
# Prepare attributes based on message type.
Expand Down
Loading