Skip to content
This repository was archived by the owner on Mar 28, 2025. It is now read-only.

Commit 966c02e

Browse files
authored
feat: Add HTTP server, remove unhealthy thread monitoring in favour of logging (#25)
1 parent c833e22 commit 966c02e

9 files changed

Lines changed: 226 additions & 215 deletions

File tree

poetry.lock

Lines changed: 89 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,26 @@ exclude = '''
2424
'''
2525

2626
[tool.isort]
27-
use_parentheses=true
28-
multi_line_output=3
29-
include_trailing_comma=true
30-
line_length=79
31-
known_first_party=['task_processor']
32-
known_third_party=['django', 'rest_framework', 'saml2', 'drf_yasg2', 'pytest']
27+
use_parentheses = true
28+
multi_line_output = 3
29+
include_trailing_comma = true
30+
line_length = 79
31+
known_first_party = ['task_processor']
32+
known_third_party = ['django', 'rest_framework', 'saml2', 'drf_yasg2', 'pytest']
3333
skip = ['migrations', 'flagsmith', '.venv']
3434

35+
[tool.pytest.ini_options]
36+
addopts = ['--ds=tests.settings', '-vvvv', '-p', 'no:warnings']
37+
console_output_style = 'count'
38+
3539
[tool.poetry]
3640
name = "flagsmith_task_processor"
3741
version = "1.0.0"
3842
description = "Task Processor plugin for Flagsmith application."
3943
authors = ["Flagsmith <support@flagsmith.com>"]
4044
readme = "readme.md"
4145
include = [{ path = "migrations/sql/*", format = ["sdist", "wheel"] }]
42-
packages = [{ include = "task_processor"}]
46+
packages = [{ include = "task_processor" }]
4347

4448
[tool.poetry.dependencies]
4549
python = ">=3.10,<4.0"
@@ -50,6 +54,7 @@ drf-yasg = "~1.21.6"
5054
dj-database-url = "~0.5.0"
5155
environs = "~9.2.0"
5256
psycopg2-binary = "~2.9.5"
57+
gunicorn = "*"
5358

5459
[tool.poetry.group.dev.dependencies]
5560
django = "~4.2.18"

task_processor/management/commands/checktaskprocessorthreadhealth.py

Lines changed: 0 additions & 17 deletions
This file was deleted.
Lines changed: 22 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,12 @@
11
import logging
2-
import signal
3-
import time
42
from argparse import ArgumentParser
5-
from datetime import timedelta
63

74
from django.core.management import BaseCommand
8-
from django.utils import timezone
5+
from gunicorn.config import Config
96

10-
from task_processor.task_registry import initialise
11-
from task_processor.thread_monitoring import (
12-
clear_unhealthy_threads,
13-
write_unhealthy_threads,
14-
)
15-
from task_processor.threads import TaskRunner
7+
from task_processor.threads import TaskRunner, TaskRunnerCoordinator
8+
from task_processor.types import TaskProcessorConfig
9+
from task_processor.utils import run_server
1610

1711
logger = logging.getLogger(__name__)
1812

@@ -21,9 +15,6 @@ class Command(BaseCommand):
2115
def __init__(self, *args, **kwargs):
2216
super().__init__(*args, **kwargs)
2317

24-
signal.signal(signal.SIGINT, self._exit_gracefully)
25-
signal.signal(signal.SIGTERM, self._exit_gracefully)
26-
2718
self._threads: list[TaskRunner] = []
2819
self._monitor_threads = True
2920

@@ -52,60 +43,27 @@ def add_arguments(self, parser: ArgumentParser):
5243
help="Number of tasks each worker will pop from the queue on each cycle.",
5344
default=10,
5445
)
55-
56-
def handle(self, *args, **options):
57-
num_threads = options["numthreads"]
58-
sleep_interval_ms = options["sleepintervalms"]
59-
grace_period_ms = options["graceperiodms"]
60-
queue_pop_size = options["queuepopsize"]
61-
62-
logger.debug(
63-
"Running task processor with args: %s",
64-
",".join([f"{k}={v}" for k, v in options.items()]),
46+
parser.add_subparsers(dest="gunicorn").add_parser(
47+
"gunicorn arguments",
48+
add_help=False,
49+
aliases=["gunicorn"],
50+
parents=[Config().parser()],
6551
)
6652

67-
self._threads.extend(
68-
[
69-
TaskRunner(
70-
sleep_interval_millis=sleep_interval_ms,
71-
queue_pop_size=queue_pop_size,
72-
)
73-
for _ in range(num_threads)
74-
]
53+
def handle(self, *args, **options):
54+
config = TaskProcessorConfig(
55+
num_threads=options["numthreads"],
56+
sleep_interval_ms=options["sleepintervalms"],
57+
grace_period_ms=options["graceperiodms"],
58+
queue_pop_size=options["queuepopsize"],
7559
)
7660

77-
logger.info("Processor starting")
78-
79-
initialise()
80-
81-
for thread in self._threads:
82-
thread.start()
83-
84-
clear_unhealthy_threads()
85-
while self._monitor_threads:
86-
time.sleep(1)
87-
unhealthy_threads = self._get_unhealthy_threads(
88-
ms_before_unhealthy=grace_period_ms + sleep_interval_ms
89-
)
90-
if unhealthy_threads:
91-
write_unhealthy_threads(unhealthy_threads)
92-
93-
[t.join() for t in self._threads]
94-
95-
def _exit_gracefully(self, *args):
96-
self._monitor_threads = False
97-
for t in self._threads:
98-
t.stop()
61+
logger.debug("Config: %s", config)
9962

100-
def _get_unhealthy_threads(self, ms_before_unhealthy: int) -> list[TaskRunner]:
101-
unhealthy_threads = []
102-
healthy_threshold = timezone.now() - timedelta(milliseconds=ms_before_unhealthy)
63+
coordinator = TaskRunnerCoordinator(config=config)
64+
coordinator.start()
10365

104-
for thread in self._threads:
105-
if (
106-
not thread.is_alive()
107-
or not thread.last_checked_for_tasks
108-
or thread.last_checked_for_tasks < healthy_threshold
109-
):
110-
unhealthy_threads.append(thread)
111-
return unhealthy_threads
66+
try:
67+
run_server(options=options)
68+
finally:
69+
coordinator.stop()

task_processor/thread_monitoring.py

Lines changed: 0 additions & 34 deletions
This file was deleted.

task_processor/threads.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,76 @@
11
import logging
22
import time
3+
from datetime import timedelta
34
from threading import Thread
45

56
from django.db import close_old_connections
67
from django.utils import timezone
78

89
from task_processor.processor import run_recurring_tasks, run_tasks
10+
from task_processor.task_registry import initialise
11+
from task_processor.types import TaskProcessorConfig
912

1013
logger = logging.getLogger(__name__)
1114

1215

16+
class TaskRunnerCoordinator(Thread):
17+
def __init__(
18+
self,
19+
*args,
20+
config: TaskProcessorConfig,
21+
**kwargs,
22+
) -> None:
23+
super().__init__(*args, **kwargs)
24+
self.config = config
25+
self._threads: list[TaskRunner] = []
26+
self._monitor_threads = True
27+
28+
def run(self) -> None:
29+
initialise()
30+
31+
logger.info("Processor starting")
32+
33+
for _ in range(self.config.num_threads):
34+
self._threads.append(
35+
task := TaskRunner(
36+
sleep_interval_millis=self.config.sleep_interval_ms,
37+
queue_pop_size=self.config.queue_pop_size,
38+
)
39+
)
40+
task.start()
41+
42+
ms_before_unhealthy = (
43+
self.config.grace_period_ms + self.config.sleep_interval_ms
44+
)
45+
while self._monitor_threads:
46+
time.sleep(1)
47+
unhealthy_threads = self._get_unhealthy_threads(
48+
ms_before_unhealthy=ms_before_unhealthy
49+
)
50+
if unhealthy_threads:
51+
logger.warning("%d unhealthy threads detected", len(unhealthy_threads))
52+
53+
[t.join() for t in self._threads]
54+
55+
def _get_unhealthy_threads(self, ms_before_unhealthy: int) -> list["TaskRunner"]:
56+
unhealthy_threads = []
57+
healthy_threshold = timezone.now() - timedelta(milliseconds=ms_before_unhealthy)
58+
59+
for thread in self._threads:
60+
if (
61+
not thread.is_alive()
62+
or not thread.last_checked_for_tasks
63+
or thread.last_checked_for_tasks < healthy_threshold
64+
):
65+
unhealthy_threads.append(thread)
66+
return unhealthy_threads
67+
68+
def stop(self) -> None:
69+
self._monitor_threads = False
70+
for t in self._threads:
71+
t.stop()
72+
73+
1374
class TaskRunner(Thread):
1475
def __init__(
1576
self,

task_processor/types.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from dataclasses import dataclass
2+
3+
4+
@dataclass
5+
class TaskProcessorConfig:
6+
num_threads: int
7+
sleep_interval_ms: int
8+
grace_period_ms: int
9+
queue_pop_size: int

task_processor/utils.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from gunicorn.app.wsgiapp import WSGIApplication as GunicornWSGIApplication
2+
3+
4+
class _WSGIApplication(GunicornWSGIApplication):
5+
def __init__(
6+
self,
7+
app_uri: str,
8+
options: dict[str, str] | None = None,
9+
) -> None:
10+
self.options = options or {}
11+
self.app_uri = app_uri
12+
super().__init__()
13+
14+
def load_config(self) -> None:
15+
config = {
16+
key: value
17+
for key, value in self.options.items()
18+
if key in self.cfg.settings and value is not None
19+
}
20+
for key, value in config.items():
21+
self.cfg.set(key.lower(), value)
22+
23+
24+
def run_server(
25+
app_uri: str = "app.wsgi",
26+
options: dict[str, str] | None = None,
27+
) -> None:
28+
options = options or {}
29+
# Defaults suitable for Task processor configuration
30+
# intended to only serve the health check endpoints
31+
options.setdefault("worker_class", "sync")
32+
options.setdefault("workers", 1)
33+
_WSGIApplication(app_uri, options).run()

0 commit comments

Comments
 (0)