Skip to content
This repository was archived by the owner on Mar 28, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 89 additions & 4 deletions poetry.lock

Large diffs are not rendered by default.

19 changes: 12 additions & 7 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,26 @@ exclude = '''
'''

[tool.isort]
use_parentheses=true
multi_line_output=3
include_trailing_comma=true
line_length=79
known_first_party=['task_processor']
known_third_party=['django', 'rest_framework', 'saml2', 'drf_yasg2', 'pytest']
use_parentheses = true
multi_line_output = 3
include_trailing_comma = true
line_length = 79
known_first_party = ['task_processor']
known_third_party = ['django', 'rest_framework', 'saml2', 'drf_yasg2', 'pytest']
skip = ['migrations', 'flagsmith', '.venv']

[tool.pytest.ini_options]
addopts = ['--ds=tests.settings', '-vvvv', '-p', 'no:warnings']
console_output_style = 'count'

[tool.poetry]
name = "flagsmith_task_processor"
version = "1.0.0"
description = "Task Processor plugin for Flagsmith application."
authors = ["Flagsmith <support@flagsmith.com>"]
readme = "readme.md"
include = [{ path = "migrations/sql/*", format = ["sdist", "wheel"] }]
packages = [{ include = "task_processor"}]
packages = [{ include = "task_processor" }]

[tool.poetry.dependencies]
python = ">=3.10,<4.0"
Expand All @@ -50,6 +54,7 @@ drf-yasg = "~1.21.6"
dj-database-url = "~0.5.0"
environs = "~9.2.0"
psycopg2-binary = "~2.9.5"
gunicorn = "*"

[tool.poetry.group.dev.dependencies]
django = "~4.2.18"
Expand Down

This file was deleted.

86 changes: 22 additions & 64 deletions task_processor/management/commands/runprocessor.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
import logging
import signal
import time
from argparse import ArgumentParser
from datetime import timedelta

from django.core.management import BaseCommand
from django.utils import timezone
from gunicorn.config import Config

from task_processor.task_registry import initialise
from task_processor.thread_monitoring import (
clear_unhealthy_threads,
write_unhealthy_threads,
)
from task_processor.threads import TaskRunner
from task_processor.threads import TaskRunner, TaskRunnerCoordinator
from task_processor.types import TaskProcessorConfig
from task_processor.utils import run_server

logger = logging.getLogger(__name__)

Expand All @@ -21,9 +15,6 @@ class Command(BaseCommand):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

signal.signal(signal.SIGINT, self._exit_gracefully)
signal.signal(signal.SIGTERM, self._exit_gracefully)

self._threads: list[TaskRunner] = []
self._monitor_threads = True

Expand Down Expand Up @@ -52,60 +43,27 @@ def add_arguments(self, parser: ArgumentParser):
help="Number of tasks each worker will pop from the queue on each cycle.",
default=10,
)

def handle(self, *args, **options):
num_threads = options["numthreads"]
sleep_interval_ms = options["sleepintervalms"]
grace_period_ms = options["graceperiodms"]
queue_pop_size = options["queuepopsize"]

logger.debug(
"Running task processor with args: %s",
",".join([f"{k}={v}" for k, v in options.items()]),
parser.add_subparsers(dest="gunicorn").add_parser(
"gunicorn arguments",
add_help=False,
aliases=["gunicorn"],
parents=[Config().parser()],
)

self._threads.extend(
[
TaskRunner(
sleep_interval_millis=sleep_interval_ms,
queue_pop_size=queue_pop_size,
)
for _ in range(num_threads)
]
def handle(self, *args, **options):
config = TaskProcessorConfig(
num_threads=options["numthreads"],
sleep_interval_ms=options["sleepintervalms"],
grace_period_ms=options["graceperiodms"],
queue_pop_size=options["queuepopsize"],
)

logger.info("Processor starting")

initialise()

for thread in self._threads:
thread.start()

clear_unhealthy_threads()
while self._monitor_threads:
time.sleep(1)
unhealthy_threads = self._get_unhealthy_threads(
ms_before_unhealthy=grace_period_ms + sleep_interval_ms
)
if unhealthy_threads:
write_unhealthy_threads(unhealthy_threads)

[t.join() for t in self._threads]

def _exit_gracefully(self, *args):
self._monitor_threads = False
for t in self._threads:
t.stop()
logger.debug("Config: %s", config)

def _get_unhealthy_threads(self, ms_before_unhealthy: int) -> list[TaskRunner]:
unhealthy_threads = []
healthy_threshold = timezone.now() - timedelta(milliseconds=ms_before_unhealthy)
coordinator = TaskRunnerCoordinator(config=config)
coordinator.start()

for thread in self._threads:
if (
not thread.is_alive()
or not thread.last_checked_for_tasks
or thread.last_checked_for_tasks < healthy_threshold
):
unhealthy_threads.append(thread)
return unhealthy_threads
try:
run_server(options=options)
finally:
coordinator.stop()
34 changes: 0 additions & 34 deletions task_processor/thread_monitoring.py

This file was deleted.

61 changes: 61 additions & 0 deletions task_processor/threads.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,76 @@
import logging
import time
from datetime import timedelta
from threading import Thread

from django.db import close_old_connections
from django.utils import timezone

from task_processor.processor import run_recurring_tasks, run_tasks
from task_processor.task_registry import initialise
from task_processor.types import TaskProcessorConfig

logger = logging.getLogger(__name__)


class TaskRunnerCoordinator(Thread):
def __init__(
self,
*args,
config: TaskProcessorConfig,
**kwargs,
) -> None:
super().__init__(*args, **kwargs)
self.config = config
self._threads: list[TaskRunner] = []
self._monitor_threads = True

def run(self) -> None:
initialise()

logger.info("Processor starting")

for _ in range(self.config.num_threads):
self._threads.append(
task := TaskRunner(
sleep_interval_millis=self.config.sleep_interval_ms,
queue_pop_size=self.config.queue_pop_size,
)
)
task.start()

ms_before_unhealthy = (
self.config.grace_period_ms + self.config.sleep_interval_ms
)
while self._monitor_threads:
time.sleep(1)
unhealthy_threads = self._get_unhealthy_threads(
ms_before_unhealthy=ms_before_unhealthy
)
if unhealthy_threads:
logger.warning("%d unhealthy threads detected", len(unhealthy_threads))

[t.join() for t in self._threads]

def _get_unhealthy_threads(self, ms_before_unhealthy: int) -> list["TaskRunner"]:
unhealthy_threads = []
healthy_threshold = timezone.now() - timedelta(milliseconds=ms_before_unhealthy)

for thread in self._threads:
if (
not thread.is_alive()
or not thread.last_checked_for_tasks
or thread.last_checked_for_tasks < healthy_threshold
):
unhealthy_threads.append(thread)
return unhealthy_threads

def stop(self) -> None:
self._monitor_threads = False
for t in self._threads:
t.stop()


class TaskRunner(Thread):
def __init__(
self,
Expand Down
9 changes: 9 additions & 0 deletions task_processor/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from dataclasses import dataclass


@dataclass
class TaskProcessorConfig:
num_threads: int
sleep_interval_ms: int
grace_period_ms: int
queue_pop_size: int
33 changes: 33 additions & 0 deletions task_processor/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from gunicorn.app.wsgiapp import WSGIApplication as GunicornWSGIApplication


class _WSGIApplication(GunicornWSGIApplication):
def __init__(
self,
app_uri: str,
options: dict[str, str] | None = None,
) -> None:
self.options = options or {}
self.app_uri = app_uri
super().__init__()

def load_config(self) -> None:
config = {
key: value
for key, value in self.options.items()
if key in self.cfg.settings and value is not None
}
for key, value in config.items():
self.cfg.set(key.lower(), value)


def run_server(
app_uri: str = "app.wsgi",
options: dict[str, str] | None = None,
) -> None:
options = options or {}
# Defaults suitable for Task processor configuration
# intended to only serve the health check endpoints
options.setdefault("worker_class", "sync")
options.setdefault("workers", 1)
_WSGIApplication(app_uri, options).run()
Loading