diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 196cb001cc..86b68c3d4d 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -17,6 +17,7 @@ Added * Add ``get_entrypoint()`` method to ``ActionResourceManager`` attribute of st2client. #4791 * Add support for orquesta task retry. (new feature) +* Add config option ``scheduler.execution_scheduling_timeout_threshold_min`` to better control the cleanup of scheduled actions that were orphaned. #4886 Changed ~~~~~~~ diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index 967ed611fa..bd99093c24 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -273,6 +273,8 @@ retry_max_attempt = 10 logging = /etc/st2/logging.scheduler.conf # How long (in seconds) to sleep between each action scheduler main loop run interval. sleep_interval = 0.1 +# How long GC to search back in minutes for orphaned scheduled actions +execution_scheduling_timeout_threshold_min = 1 # The size of the pool used by the scheduler for scheduling executions. pool_size = 10 # The number of milliseconds to wait in between retries. diff --git a/st2actions/st2actions/scheduler/config.py b/st2actions/st2actions/scheduler/config.py index e507796007..ceee6d0d6c 100644 --- a/st2actions/st2actions/scheduler/config.py +++ b/st2actions/st2actions/scheduler/config.py @@ -50,6 +50,9 @@ def _register_service_opts(): default='/etc/st2/logging.scheduler.conf', help='Location of the logging configuration file.' ), + cfg.FloatOpt( + 'execution_scheduling_timeout_threshold_min', default=1, + help='How long GC to search back in minutes for orphaned scheduled actions'), cfg.IntOpt( 'pool_size', default=10, help='The size of the pool used by the scheduler for scheduling executions.'), diff --git a/st2actions/st2actions/scheduler/handler.py b/st2actions/st2actions/scheduler/handler.py index b252c520e1..c0a1e4e8b4 100644 --- a/st2actions/st2actions/scheduler/handler.py +++ b/st2actions/st2actions/scheduler/handler.py @@ -44,14 +44,6 @@ LOG = logging.getLogger(__name__) -# If an ActionExecutionSchedulingQueueItemDB object hasn't been updated fore more than this amount -# of milliseconds, it will be marked as "handled=False". -# As soon as an item is picked by scheduler to be processed, it should be processed very fast -# (< 5 seconds). If an item is still being marked as processing it likely indicates that the -# scheduler process which was processing that item crashed or similar so we need to mark it as -# "handling=False" so some other scheduler process can pick it up. -EXECUTION_SCHEDUELING_TIMEOUT_THRESHOLD_MS = (60 * 1000) - # When a policy delayed execution is detected it will be try to be rescheduled by the scheduler # again in this amount of milliseconds. POLICY_DELAYED_EXECUTION_RESCHEDULE_TIME_MS = 2500 @@ -62,6 +54,14 @@ def __init__(self): self.message_type = LiveActionDB self._shutdown = False self._pool = eventlet.GreenPool(size=cfg.CONF.scheduler.pool_size) + # If an ActionExecutionSchedulingQueueItemDB object hasn't been updated fore more than + # this amount of milliseconds, it will be marked as "handled=False". + # As soon as an item is picked by scheduler to be processed, it should be processed very + # fast (< 5 seconds). If an item is still being marked as processing it likely indicates + # that the scheduler process which was processing that item crashed or similar so we need + # to mark it as "handling=False" so some other scheduler process can pick it up. + self._execution_scheduling_timeout_threshold_ms = \ + cfg.CONF.scheduler.execution_scheduling_timeout_threshold_min * 60 * 1000 self._coordinator = coordination_service.get_coordinator(start_heart=True) self._main_thread = None self._cleanup_thread = None @@ -100,7 +100,7 @@ def _reset_handling_flag(self): query = { 'scheduled_start_timestamp__lte': date.append_milliseconds_to_time( date.get_datetime_utc_now(), - -EXECUTION_SCHEDUELING_TIMEOUT_THRESHOLD_MS + -self._execution_scheduling_timeout_threshold_ms ), 'handling': True } diff --git a/st2tests/st2tests/config.py b/st2tests/st2tests/config.py index 3f67d4c119..169af5117a 100644 --- a/st2tests/st2tests/config.py +++ b/st2tests/st2tests/config.py @@ -283,6 +283,9 @@ def _register_ssh_runner_opts(): def _register_scheduler_opts(): scheduler_opts = [ + cfg.FloatOpt( + 'execution_scheduling_timeout_threshold_min', default=1, + help='How long GC to search back in minutes for orphaned scheduled actions'), cfg.IntOpt( 'pool_size', default=10, help='The size of the pool used by the scheduler for scheduling executions.'),