diff --git a/contentcuration/contentcuration/migrations/0145_custom_task_metadata.py b/contentcuration/contentcuration/migrations/0145_custom_task_metadata.py new file mode 100644 index 0000000000..64287039f0 --- /dev/null +++ b/contentcuration/contentcuration/migrations/0145_custom_task_metadata.py @@ -0,0 +1,48 @@ +# Generated by Django 3.2.19 on 2023-09-14 05:08 +import django.core.validators +import django.db.models.deletion +from celery import states +from django.conf import settings +from django.db import migrations +from django.db import models + +def transfer_data(apps, schema_editor): + CustomTaskMetadata = apps.get_model('contentcuration', 'CustomTaskMetadata') + TaskResult = apps.get_model('django_celery_results', 'taskresult') + + old_task_results = TaskResult.objects.filter(status__in=states.UNREADY_STATES) + + for old_task_result in old_task_results: + CustomTaskMetadata.objects.create( + task_id=old_task_result.task_id, + user=old_task_result.user, + channel_id=old_task_result.channel_id, + progress=old_task_result.progress, + signature=old_task_result.signature, + ) + +class Migration(migrations.Migration): + + dependencies = [ + ('contentcuration', '0144_soft_delete_user'), + ] + + operations = [ + migrations.CreateModel( + name='CustomTaskMetadata', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('task_id', models.CharField(max_length=255, unique=True)), + ('channel_id', models.UUIDField(blank=True, db_index=True, null=True)), + ('progress', models.IntegerField(blank=True, null=True, validators=[django.core.validators.MinValueValidator(0), django.core.validators.MaxValueValidator(100)])), + ('signature', models.CharField(max_length=32, null=True)), + ('date_created', models.DateTimeField(auto_now_add=True, help_text='Datetime field when the custom_metadata for task was created in UTC', verbose_name='Created DateTime')), + ('user', models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, related_name='tasks', to=settings.AUTH_USER_MODEL)), + ], + ), + migrations.AddIndex( + model_name='customtaskmetadata', + index=models.Index(fields=['signature'], name='task_result_signature'), + ), + migrations.RunPython(transfer_data), + ] diff --git a/contentcuration/contentcuration/migrations/0146_drop_taskresult_fields.py b/contentcuration/contentcuration/migrations/0146_drop_taskresult_fields.py new file mode 100644 index 0000000000..5ecc6cb98f --- /dev/null +++ b/contentcuration/contentcuration/migrations/0146_drop_taskresult_fields.py @@ -0,0 +1,37 @@ +# Generated by Django 3.2.19 on 2023-09-14 10:42 +from django.db import migrations + +class Migration(migrations.Migration): + + replaces = [('django_celery_results', '0145_custom_task_metadata'),] + + def __init__(self, name, app_label): + super(Migration, self).__init__(name, 'django_celery_results') + + dependencies = [ + ('contentcuration', '0145_custom_task_metadata'), + ('contentcuration', '0141_add_task_signature'), + ] + + operations = [ + migrations.RemoveField( + model_name='taskresult', + name='channel_id', + ), + migrations.RemoveField( + model_name='taskresult', + name='progress', + ), + migrations.RemoveField( + model_name='taskresult', + name='user', + ), + migrations.RemoveField( + model_name='taskresult', + name='signature', + ), + migrations.RemoveIndex( + model_name='taskresult', + name='task_result_signature_idx', + ), + ] diff --git a/contentcuration/contentcuration/models.py b/contentcuration/contentcuration/models.py index 86a400931c..0d73096bfa 100644 --- a/contentcuration/contentcuration/models.py +++ b/contentcuration/contentcuration/models.py @@ -7,7 +7,6 @@ from datetime import datetime import pytz -from celery import states as celery_states from django.conf import settings from django.contrib.auth.base_user import AbstractBaseUser from django.contrib.auth.base_user import BaseUserManager @@ -46,7 +45,6 @@ from django.dispatch import receiver from django.utils import timezone from django.utils.translation import gettext as _ -from django_celery_results.models import TaskResult from django_cte import With from le_utils import proquint from le_utils.constants import content_kinds @@ -2566,58 +2564,29 @@ def serialize_to_change_dict(self): return self.serialize(self) -class TaskResultCustom(object): - """ - Custom fields to add to django_celery_results's TaskResult model - - If adding fields to this class, run `makemigrations` then move the generated migration from the - `django_celery_results` app to the `contentcuration` app and override the constructor to change - the app_label. See `0141_add_task_signature` for an example - """ +class CustomTaskMetadata(models.Model): + # Task_id for reference + task_id = models.CharField( + max_length=255, # Adjust the max_length as needed + unique=True, + ) # user shouldn't be null, but in order to append the field, this needs to be allowed user = models.ForeignKey(settings.AUTH_USER_MODEL, related_name="tasks", on_delete=models.CASCADE, null=True) channel_id = DjangoUUIDField(db_index=True, null=True, blank=True) progress = models.IntegerField(null=True, blank=True, validators=[MinValueValidator(0), MaxValueValidator(100)]) # a hash of the task name and kwargs for identifying repeat tasks signature = models.CharField(null=True, blank=False, max_length=32) - - super_as_dict = TaskResult.as_dict - - def as_dict(self): - """ - :return: A dictionary representation - """ - super_dict = self.super_as_dict() - super_dict.update( - user_id=self.user_id, - channel_id=self.channel_id, - progress=self.progress, - ) - return super_dict - - @classmethod - def contribute_to_class(cls, model_class=TaskResult): - """ - Adds fields to model, by default TaskResult - :param model_class: TaskResult model - """ - for field in dir(cls): - if not field.startswith("_") and field not in ('contribute_to_class', 'Meta'): - model_class.add_to_class(field, getattr(cls, field)) - - # manually add Meta afterwards - setattr(model_class._meta, 'indexes', getattr(model_class._meta, 'indexes', []) + cls.Meta.indexes) + date_created = models.DateTimeField( + auto_now_add=True, + verbose_name=_('Created DateTime'), + help_text=_('Datetime field when the custom_metadata for task was created in UTC') + ) class Meta: indexes = [ # add index that matches query usage for signature models.Index( fields=['signature'], - name='task_result_signature_idx', - condition=Q(status__in=celery_states.UNREADY_STATES), + name='task_result_signature', ), ] - - -# trigger class contributions immediately -TaskResultCustom.contribute_to_class() diff --git a/contentcuration/contentcuration/tests/helpers.py b/contentcuration/contentcuration/tests/helpers.py index 1e7a12a8f5..73371135be 100644 --- a/contentcuration/contentcuration/tests/helpers.py +++ b/contentcuration/contentcuration/tests/helpers.py @@ -3,10 +3,10 @@ import mock from celery import states +from django_celery_results.models import TaskResult from search.models import ContentNodeFullTextSearch from contentcuration.models import ContentNode -from contentcuration.models import TaskResult def clear_tasks(except_task_id=None): diff --git a/contentcuration/contentcuration/tests/test_asynctask.py b/contentcuration/contentcuration/tests/test_asynctask.py index 791c6378a5..4496680f9c 100644 --- a/contentcuration/contentcuration/tests/test_asynctask.py +++ b/contentcuration/contentcuration/tests/test_asynctask.py @@ -1,6 +1,7 @@ from __future__ import absolute_import import threading +import time import uuid import pytest @@ -9,12 +10,11 @@ from celery.utils.log import get_task_logger from django.core.management import call_command from django.test import TransactionTestCase +from django_celery_results.models import TaskResult from . import testdata from .helpers import clear_tasks from contentcuration.celery import app -from contentcuration.models import TaskResult - logger = get_task_logger(__name__) @@ -36,6 +36,22 @@ def test_task(self, **kwargs): return 42 +# Create a Task that takes a bit longer to be executed +@app.task(bind=True, name="test_task_delayed") +def test_task_delayed(self, delay_seconds=100, **kwargs): + """ + This is a mock task that takes a bit longer to execute + so that revoke function can be called succesfully without test being SUCCESS before hand, + to be used ONLY for unit-testing various pieces of the + async task API. + :return: The number 42 + """ + time.sleep(delay_seconds) + logger.info("Request ID = {}".format(self.request.id)) + assert TaskResult.objects.filter(task_id=self.request.id).count() == 1 + return 42 + + @app.task(name="error_test_task") def error_test_task(**kwargs): """ @@ -88,11 +104,8 @@ def _celery_task_worker(): ]) -def _celery_progress_monitor(thread_event): - def _on_iteration(receiver): - if thread_event.is_set(): - receiver.should_stop = True - app.events.monitor_progress(on_iteration=_on_iteration) +def _return_celery_task_object(task_id): + return TaskResult.objects.get(task_id=task_id) class AsyncTaskTestCase(TransactionTestCase): @@ -108,11 +121,6 @@ class AsyncTaskTestCase(TransactionTestCase): @classmethod def setUpClass(cls): super(AsyncTaskTestCase, cls).setUpClass() - # start progress monitor in separate thread - cls.monitor_thread_event = threading.Event() - cls.monitor_thread = threading.Thread(target=_celery_progress_monitor, args=(cls.monitor_thread_event,)) - cls.monitor_thread.start() - # start celery worker in separate thread cls.worker_thread = threading.Thread(target=_celery_task_worker) cls.worker_thread.start() @@ -122,8 +130,6 @@ def tearDownClass(cls): super(AsyncTaskTestCase, cls).tearDownClass() # tell the work thread to stop through the celery control API if cls.worker_thread: - cls.monitor_thread_event.set() - cls.monitor_thread.join(5) app.control.shutdown() cls.worker_thread.join(5) @@ -152,15 +158,14 @@ def test_asynctask_reports_success(self): contains the return value of the task. """ async_result = test_task.enqueue(self.user) - result = self._wait_for(async_result) task_result = async_result.get_model() + celery_task_result = TaskResult.objects.get(task_id=task_result.task_id) self.assertEqual(task_result.user, self.user) - self.assertEqual(result, 42) task_result.refresh_from_db() self.assertEqual(async_result.result, 42) - self.assertEqual(task_result.task_name, "test_task") + self.assertEqual(celery_task_result.task_name, "test_task") self.assertEqual(async_result.status, states.SUCCESS) self.assertEqual(TaskResult.objects.get(task_id=async_result.id).result, "42") self.assertEqual(TaskResult.objects.get(task_id=async_result.id).status, states.SUCCESS) @@ -177,11 +182,12 @@ def test_asynctask_reports_error(self): self._wait_for(async_result) task_result = async_result.get_model() - self.assertEqual(task_result.status, states.FAILURE) - self.assertIsNotNone(task_result.traceback) + celery_task_result = _return_celery_task_object(task_result.task_id) + self.assertEqual(celery_task_result.status, states.FAILURE) + self.assertIsNotNone(celery_task_result.traceback) self.assertIn( - "I'm sorry Dave, I'm afraid I can't do that.", task_result.result + "I'm sorry Dave, I'm afraid I can't do that.", celery_task_result.result ) def test_only_create_async_task_creates_task_entry(self): @@ -194,17 +200,6 @@ def test_only_create_async_task_creates_task_entry(self): self.assertEquals(result, 42) self.assertEquals(TaskResult.objects.filter(task_id=async_result.task_id).count(), 0) - def test_enqueue_task_adds_result_with_necessary_info(self): - async_result = test_task.enqueue(self.user, is_test=True) - try: - task_result = TaskResult.objects.get(task_id=async_result.task_id) - except TaskResult.DoesNotExist: - self.fail('Missing task result') - - self.assertEqual(task_result.task_name, test_task.name) - _, _, encoded_kwargs = test_task.backend.encode_content(dict(is_test=True)) - self.assertEqual(task_result.task_kwargs, encoded_kwargs) - @pytest.mark.skip(reason="This test is flaky on Github Actions") def test_fetch_or_enqueue_task(self): expected_task = test_task.enqueue(self.user, is_test=True) @@ -258,8 +253,11 @@ def test_requeue_task(self): def test_revoke_task(self): channel_id = uuid.uuid4() - async_result = test_task.enqueue(self.user, channel_id=channel_id) - test_task.revoke(channel_id=channel_id) + async_result = test_task_delayed.enqueue(self.user, channel_id=channel_id) + # A bit delay to let the task object be saved async, + # This delay is relatively small and hopefully wont cause any issues in the real time + time.sleep(0.5) + test_task_delayed.revoke(channel_id=channel_id) # this should raise an exception, even though revoked, because the task is in ready state but not success with self.assertRaises(Exception): diff --git a/contentcuration/contentcuration/tests/utils/test_garbage_collect.py b/contentcuration/contentcuration/tests/utils/test_garbage_collect.py index a718a408c2..f67daf8c28 100644 --- a/contentcuration/contentcuration/tests/utils/test_garbage_collect.py +++ b/contentcuration/contentcuration/tests/utils/test_garbage_collect.py @@ -11,6 +11,7 @@ from django.core.files.base import ContentFile from django.core.files.storage import default_storage from django.urls import reverse_lazy +from django_celery_results.models import TaskResult from le_utils.constants import content_kinds from le_utils.constants import file_formats from le_utils.constants import format_presets @@ -19,7 +20,6 @@ from contentcuration.constants import user_history from contentcuration.models import ContentNode from contentcuration.models import File -from contentcuration.models import TaskResult from contentcuration.models import UserHistory from contentcuration.tests.base import BaseAPITestCase from contentcuration.tests.base import StudioTestCase @@ -384,10 +384,9 @@ def test_clean_up(self): class CleanupTaskTestCase(StudioTestCase): def setUp(self): - user = self.admin_user - self.pruned_task = TaskResult.objects.create(task_id=uuid.uuid4().hex, status=states.SUCCESS, task_name="pruned_task", user_id=user.id) - self.failed_task = TaskResult.objects.create(task_id=uuid.uuid4().hex, status=states.FAILURE, task_name="failed_task", user_id=user.id) - self.recent_task = TaskResult.objects.create(task_id=uuid.uuid4().hex, status=states.SUCCESS, task_name="recent_task", user_id=user.id) + self.pruned_task = TaskResult.objects.create(task_id=uuid.uuid4().hex, status=states.SUCCESS, task_name="pruned_task") + self.failed_task = TaskResult.objects.create(task_id=uuid.uuid4().hex, status=states.FAILURE, task_name="failed_task") + self.recent_task = TaskResult.objects.create(task_id=uuid.uuid4().hex, status=states.SUCCESS, task_name="recent_task") # `date_done` uses `auto_now`, so manually set it done = datetime.now() - timedelta(days=8) diff --git a/contentcuration/contentcuration/tests/views/test_views_base.py b/contentcuration/contentcuration/tests/views/test_views_base.py index 74d0633971..8bf4b80726 100644 --- a/contentcuration/contentcuration/tests/views/test_views_base.py +++ b/contentcuration/contentcuration/tests/views/test_views_base.py @@ -2,10 +2,14 @@ """ Tests for contentcuration.views.internal functions. """ +import uuid + from django.urls import reverse_lazy +from django_celery_results.models import TaskResult from ..base import BaseAPITestCase -from contentcuration.models import TaskResult +from contentcuration.models import CustomTaskMetadata +from contentcuration.tests import testdata from contentcuration.utils.db_tools import TreeBuilder @@ -15,19 +19,37 @@ def test_200_get(self): self.user.save() main_tree = TreeBuilder(user=self.user) self.channel.main_tree = main_tree.root + channel_2 = testdata.channel() self.channel.save() + channel_2.save() self.channel.mark_publishing(self.user) + channel_2.mark_publishing(self.user) + task_id = uuid.uuid4().hex + task_id_2 = uuid.uuid4().hex task = TaskResult.objects.create( + task_id=task_id, + task_name="export-channel", + status="QUEUED", + ) + task_2 = TaskResult.objects.create( + task_id=task_id_2, task_name="export-channel", - channel_id=self.channel.id, - user_id=self.user.id, status="QUEUED", ) + CustomTaskMetadata(task_id=task_id, user=self.user, channel_id=self.channel.id).save() + CustomTaskMetadata(task_id=task_id_2, user=self.user, channel_id=channel_2.id).save() response = self.get( reverse_lazy("publishing_status"), ) + + expected_channel_ids = [self.channel.id, channel_2.id] + expected_channel_ids.sort() + self.assertEqual(response.status_code, 200) - self.assertEqual(1, len(response.data)) - self.assertEqual(self.channel.id, response.data[0]["channel_id"]) - self.assertEqual(task.task_id, response.data[0]["task_id"]) - self.assertIn("performed", response.data[0]) + self.assertEqual(2, len(response.data)) + + for i, item in enumerate(response.data): + self.assertEqual(expected_channel_ids[i], item["channel_id"]) + expected_task_id = task.task_id if item["channel_id"] == self.channel.id else task_2.task_id + self.assertEqual(expected_task_id, item["task_id"]) + self.assertIn("performed", item) diff --git a/contentcuration/contentcuration/tests/viewsets/test_channel.py b/contentcuration/contentcuration/tests/viewsets/test_channel.py index 02e8b6571f..9c49551ba8 100644 --- a/contentcuration/contentcuration/tests/viewsets/test_channel.py +++ b/contentcuration/contentcuration/tests/viewsets/test_channel.py @@ -299,8 +299,8 @@ def test_sync_channel_called_correctly(self, sync_channel_mock): ) self.assertEqual(response.status_code, 200) - self.assertEqual(sync_channel_mock.call_args.args[i], True) sync_channel_mock.assert_called_once() + self.assertEqual(sync_channel_mock.call_args.args[i], True) def test_deploy_channel_event(self): channel = testdata.channel() diff --git a/contentcuration/contentcuration/tests/viewsets/test_contentnode.py b/contentcuration/contentcuration/tests/viewsets/test_contentnode.py index a70f1a43ac..882b5644a9 100644 --- a/contentcuration/contentcuration/tests/viewsets/test_contentnode.py +++ b/contentcuration/contentcuration/tests/viewsets/test_contentnode.py @@ -373,7 +373,6 @@ def test_consolidate_extra_fields(self): self.viewset_url(pk=contentnode.id), format="json", ) self.assertEqual(response.status_code, 200, response.content) - print(response.data["extra_fields"]) self.assertEqual(response.data["extra_fields"]["options"]["completion_criteria"]["threshold"]["m"], 3) self.assertEqual(response.data["extra_fields"]["options"]["completion_criteria"]["threshold"]["n"], 6) self.assertEqual(response.data["extra_fields"]["options"]["completion_criteria"]["threshold"]["mastery_model"], exercises.M_OF_N) diff --git a/contentcuration/contentcuration/utils/celery/tasks.py b/contentcuration/contentcuration/utils/celery/tasks.py index cbbe53beba..df0f282304 100644 --- a/contentcuration/contentcuration/utils/celery/tasks.py +++ b/contentcuration/contentcuration/utils/celery/tasks.py @@ -10,7 +10,6 @@ from celery.app.task import Task from celery.result import AsyncResult from django.db import transaction -from django.db.utils import IntegrityError from contentcuration.constants.locking import TASK_LOCK from contentcuration.db.advisory_lock import advisory_lock @@ -69,9 +68,13 @@ def task_progress(self): def get_task_model(ref, task_id): """ Returns the task model for a task, will create one if not found - :rtype: contentcuration.models.TaskResult + :rtype: contentcuration.models.CustomTaskMetadata """ - return ref.backend.TaskModel.objects.get_task(task_id) + from contentcuration.models import CustomTaskMetadata + try: + return CustomTaskMetadata.objects.get(task_id=task_id) + except CustomTaskMetadata.DoesNotExist: + return None def generate_task_signature(task_name, task_kwargs=None, channel_id=None): @@ -166,10 +169,11 @@ def _lock_signature(self, signature): def find_ids(self, signature): """ :param signature: An hex string representing an md5 hash of task metadata - :return: A TaskResult queryset + :return: A CustomTaskMetadata queryset :rtype: django.db.models.query.QuerySet """ - return self.TaskModel.objects.filter(signature=signature)\ + from contentcuration.models import CustomTaskMetadata + return CustomTaskMetadata.objects.filter(signature=signature)\ .values_list("task_id", flat=True) def find_incomplete_ids(self, signature): @@ -178,7 +182,11 @@ def find_incomplete_ids(self, signature): :return: A TaskResult queryset :rtype: django.db.models.query.QuerySet """ - return self.find_ids(signature).filter(status__in=states.UNREADY_STATES) + from django_celery_results.models import TaskResult + # Get the filtered task_ids from CustomTaskMetadata model + filtered_task_ids = self.find_ids(signature) + task_objects_ids = TaskResult.objects.filter(task_id__in=filtered_task_ids, status__in=states.UNREADY_STATES).values_list("task_id", flat=True) + return task_objects_ids def fetch(self, task_id): """ @@ -191,8 +199,7 @@ def fetch(self, task_id): def enqueue(self, user, **kwargs): """ - Enqueues the task called with `kwargs`, and requires the user who wants to enqueue it. If `channel_id` is - passed to the function, that will be set on the TaskResult model as well. + Enqueues the task called with `kwargs`, and requires the user who wants to enqueue it. :param user: User object of the user performing the operation :param kwargs: Keyword arguments for task `apply_async` @@ -200,6 +207,7 @@ def enqueue(self, user, **kwargs): :rtype: CeleryAsyncResult """ from contentcuration.models import User + from contentcuration.models import CustomTaskMetadata if user is None or not isinstance(user, User): raise TypeError("All tasks must be assigned to a user.") @@ -211,42 +219,31 @@ def enqueue(self, user, **kwargs): task_id = uuid.uuid4().hex prepared_kwargs = self._prepare_kwargs(kwargs) channel_id = prepared_kwargs.get("channel_id") + custom_task_result = CustomTaskMetadata( + task_id=task_id, + user=user, + signature=signature, + channel_id=channel_id + ) + custom_task_result.save() logging.info(f"Enqueuing task:id {self.name}:{task_id} for user:channel {user.pk}:{channel_id} | {signature}") # returns a CeleryAsyncResult async_result = self.apply_async( task_id=task_id, + task_name=self.name, kwargs=prepared_kwargs, ) # ensure the result is saved to the backend (database) self.backend.add_pending_result(async_result) - - saved = False - tries = 0 - while not saved: - # after calling apply, we should ideally have a task result model saved to the DB, but it relies on celery's - # event consumption, and we might try to retrieve it before it has actually saved, so we retry - try: - task_result = get_task_model(self, task_id) - task_result.task_name = self.name - task_result.task_kwargs = self.backend.encode(prepared_kwargs) - task_result.user = user - task_result.channel_id = channel_id - task_result.signature = signature - task_result.save() - saved = True - except IntegrityError as e: - tries += 1 - if tries > 3: - raise e return async_result def fetch_or_enqueue(self, user, **kwargs): """ Fetches an existing incomplete task or enqueues one if not found, called with `kwargs`, and requires the user - who wants to enqueue it. If `channel_id` is passed to the function, that will be set on the TaskResult model + who wants to enqueue it. If `channel_id` is passed to the function, that will be set on the CustomTaskMetadata model :param user: User object of the user performing the operation :param kwargs: Keyword arguments for task `apply_async` @@ -281,15 +278,16 @@ def requeue(self, **kwargs): :return: The celery async result :rtype: CeleryAsyncResult """ + from contentcuration.models import CustomTaskMetadata request = self.request if request is None: raise NotImplementedError("This method should only be called within the execution of a task") - task_result = get_task_model(self, request.id) task_kwargs = request.kwargs.copy() task_kwargs.update(kwargs) signature = self.generate_signature(kwargs) - logging.info(f"Re-queuing task {self.name} for user {task_result.user.pk} from {request.id} | {signature}") - return self.enqueue(task_result.user, signature=signature, **task_kwargs) + custom_task_metadata = CustomTaskMetadata.objects.get(task_id=request.id) + logging.info(f"Re-queuing task {self.name} for user {custom_task_metadata.user.pk} from {request.id} | {signature}") + return self.enqueue(custom_task_metadata.user, signature=signature, **task_kwargs) def revoke(self, exclude_task_ids=None, **kwargs): """ @@ -298,6 +296,7 @@ def revoke(self, exclude_task_ids=None, **kwargs): :param kwargs: Task keyword arguments that will be used to match against tasks :return: The number of tasks revoked """ + from django_celery_results.models import TaskResult signature = self.generate_signature(kwargs) task_ids = self.find_incomplete_ids(signature) @@ -309,33 +308,38 @@ def revoke(self, exclude_task_ids=None, **kwargs): self.app.control.revoke(task_id, terminate=True) count += 1 # be sure the database backend has these marked appropriately - self.TaskModel.objects.filter(task_id__in=task_ids).update(status=states.REVOKED) + TaskResult.objects.filter(task_id__in=task_ids).update(status=states.REVOKED) return count class CeleryAsyncResult(AsyncResult): """ Custom result class which has access to task data stored in the backend - - The properties access additional properties in the same manner as super properties, - and our custom properties are added to the meta via TaskResultCustom.as_dict() + We access those from the CustomTaskMetadata model. """ + _cached_model = None + def get_model(self): """ - :return: The TaskResult model object - :rtype: contentcuration.models.TaskResult + :return: The CustomTaskMetadatamodel object + :rtype: contentcuration.models.CustomTaskMetadata """ - return get_task_model(self, self.task_id) + if self._cached_model is None: + self._cached_model = get_task_model(self, self.task_id) + return self._cached_model @property def user_id(self): - return self._get_task_meta().get('user_id') + if self.get_model(): + return self.get_model().user_id @property def channel_id(self): - return self._get_task_meta().get('channel_id') + if self.get_model(): + return self.get_model().channel_id @property def progress(self): - return self._get_task_meta().get('progress') + if self.get_model(): + return self.get_model().progress diff --git a/contentcuration/contentcuration/utils/garbage_collect.py b/contentcuration/contentcuration/utils/garbage_collect.py index 44e09e4dab..805a114231 100755 --- a/contentcuration/contentcuration/utils/garbage_collect.py +++ b/contentcuration/contentcuration/utils/garbage_collect.py @@ -16,14 +16,15 @@ from django.db.models.expressions import Value from django.db.models.signals import post_delete from django.utils.timezone import now +from django_celery_results.models import TaskResult from le_utils.constants import content_kinds from contentcuration.constants import feature_flags from contentcuration.constants import user_history from contentcuration.db.models.functions import JSONObjectKeys from contentcuration.models import ContentNode +from contentcuration.models import CustomTaskMetadata from contentcuration.models import File -from contentcuration.models import TaskResult from contentcuration.models import User from contentcuration.models import UserHistory @@ -164,8 +165,10 @@ def clean_up_tasks(): """ with DisablePostDeleteSignal(): date_cutoff = now() - datetime.timedelta(days=7) - count, _ = TaskResult.objects.filter(date_done__lt=date_cutoff, status__in=states.READY_STATES).delete() + tasks_to_delete = TaskResult.objects.filter(date_done__lt=date_cutoff, status__in=states.READY_STATES) + CustomTaskMetadata.objects.filter(task_id__in=tasks_to_delete.values_list("task_id", flat=True)).delete() + count, _ = tasks_to_delete.delete() logging.info("Deleted {} completed task(s) from the task table".format(count)) diff --git a/contentcuration/contentcuration/views/base.py b/contentcuration/contentcuration/views/base.py index 1f0eb999d4..2975bd1ed0 100644 --- a/contentcuration/contentcuration/views/base.py +++ b/contentcuration/contentcuration/views/base.py @@ -7,6 +7,7 @@ from django.core.cache import cache from django.core.exceptions import PermissionDenied from django.db.models import Count +from django.db.models import Exists from django.db.models import IntegerField from django.db.models import OuterRef from django.db.models import Subquery @@ -28,6 +29,7 @@ from django.utils.translation import LANGUAGE_SESSION_KEY from django.views.decorators.http import require_POST from django.views.i18n import LANGUAGE_QUERY_PARAMETER +from django_celery_results.models import TaskResult from rest_framework.authentication import BasicAuthentication from rest_framework.authentication import SessionAuthentication from rest_framework.authentication import TokenAuthentication @@ -47,10 +49,10 @@ from contentcuration.models import ChannelHistory from contentcuration.models import ChannelSet from contentcuration.models import ContentKind +from contentcuration.models import CustomTaskMetadata from contentcuration.models import DEFAULT_USER_PREFERENCES from contentcuration.models import Language from contentcuration.models import License -from contentcuration.models import TaskResult from contentcuration.serializers import SimplifiedChannelProbeCheckSerializer from contentcuration.utils.messages import get_messages from contentcuration.viewsets.channelset import PublicChannelSetSerializer @@ -132,17 +134,16 @@ def get_prober_channel(request): return Response(SimplifiedChannelProbeCheckSerializer(channel).data) - @api_view(["GET"]) @authentication_classes((TokenAuthentication, SessionAuthentication)) @permission_classes((IsAuthenticated,)) def publishing_status(request): if not request.user.is_admin: return HttpResponseForbidden() - + associated_custom_task_metadata_ids = CustomTaskMetadata.objects.filter(channel_id=Cast(OuterRef(OuterRef("channel_id")), UUIDField())).values_list("task_id",flat=True) associated_tasks = TaskResult.objects.filter( task_name="export-channel", - channel_id=Cast(OuterRef("channel_id"), UUIDField()), + task_id__in=Subquery(associated_custom_task_metadata_ids), ) channel_publish_status = ( ChannelHistory.objects diff --git a/contentcuration/contentcuration/viewsets/base.py b/contentcuration/contentcuration/viewsets/base.py index 18e1771de0..bbc83c4d92 100644 --- a/contentcuration/contentcuration/viewsets/base.py +++ b/contentcuration/contentcuration/viewsets/base.py @@ -10,6 +10,7 @@ from django.http import Http404 from django.http.request import HttpRequest from django_bulk_update.helper import bulk_update +from django_celery_results.models import TaskResult from django_filters.constants import EMPTY_VALUES from django_filters.rest_framework import DjangoFilterBackend from django_filters.rest_framework import FilterSet @@ -29,7 +30,8 @@ from rest_framework.viewsets import GenericViewSet from contentcuration.models import Change -from contentcuration.models import TaskResult +from contentcuration.models import CustomTaskMetadata +from contentcuration.utils.celery.tasks import generate_task_signature from contentcuration.utils.celery.tasks import ProgressTracker from contentcuration.viewsets.common import MissingRequiredParamsException from contentcuration.viewsets.sync.constants import TASK_ID @@ -929,24 +931,33 @@ def delete_from_changes(self, changes): @contextmanager def create_change_tracker(pk, table, channel_id, user, task_name): + task_kwargs = json.dumps({'pk': pk, 'table': table}) + # Clean up any previous tasks specific to this in case there were failures. - meta = json.dumps(dict(pk=pk, table=table)) - TaskResult.objects.filter(channel_id=channel_id, task_name=task_name, meta=meta).delete() + signature = generate_task_signature(task_name, task_kwargs=task_kwargs, channel_id=channel_id) + + task_id_to_delete = CustomTaskMetadata.objects.filter(channel_id=channel_id, signature=signature) + if task_id_to_delete: + TaskResult.objects.filter(task_id=task_id_to_delete, task_name=task_name).delete() task_id = uuid.uuid4().hex + task_object = TaskResult.objects.create( task_id=task_id, status=states.STARTED, - channel_id=channel_id, task_name=task_name, + ) + custom_task_metadata_object = CustomTaskMetadata.objects.create( + task_id=task_id, + channel_id=channel_id, user=user, - meta=meta + signature=signature ) def update_progress(progress=None): if progress: - task_object.progress = progress - task_object.save() + custom_task_metadata_object.progress = progress + custom_task_metadata_object.save() Change.create_change( generate_update_event(pk, table, {TASK_ID: task_object.task_id}, channel_id=channel_id), applied=True @@ -968,3 +979,4 @@ def update_progress(progress=None): generate_update_event(pk, table, {TASK_ID: None}, channel_id=channel_id), applied=True ) task_object.delete() + custom_task_metadata_object.delete() diff --git a/contentcuration/contentcuration/viewsets/sync/endpoint.py b/contentcuration/contentcuration/viewsets/sync/endpoint.py index a7b1e657fb..84d9734edf 100644 --- a/contentcuration/contentcuration/viewsets/sync/endpoint.py +++ b/contentcuration/contentcuration/viewsets/sync/endpoint.py @@ -4,7 +4,10 @@ bulk creates, updates, and deletes. """ from celery import states +from django.db.models import Exists +from django.db.models import OuterRef from django.db.models import Q +from django_celery_results.models import TaskResult from rest_framework.authentication import SessionAuthentication from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response @@ -12,7 +15,7 @@ from contentcuration.models import Change from contentcuration.models import Channel -from contentcuration.models import TaskResult +from contentcuration.models import CustomTaskMetadata from contentcuration.tasks import apply_channel_changes_task from contentcuration.tasks import apply_user_changes_task from contentcuration.viewsets.sync.constants import CHANNEL @@ -123,21 +126,40 @@ def return_changes(self, request, channel_revs): return {"changes": changes, "errors": errors, "successes": successes} def return_tasks(self, request, channel_revs): - tasks = TaskResult.objects.filter( - channel_id__in=channel_revs.keys(), - status__in=[states.STARTED, states.FAILURE], - ).exclude(task_name__in=[apply_channel_changes_task.name, apply_user_changes_task.name]) - return { - "tasks": tasks.values( - "task_id", - "task_name", - "traceback", - "progress", - "channel_id", - "status", + custom_task_objects = CustomTaskMetadata.objects.filter( + channel_id__in=channel_revs.keys() + ).annotate( + has_matching_task=Exists( + TaskResult.objects.filter( + task_id=OuterRef('task_id'), + status__in=[states.STARTED, states.FAILURE], + ).exclude( + task_name__in=[apply_channel_changes_task.name, apply_user_changes_task.name] + ) ) + ).filter( + has_matching_task=True + ) + + response_payload = { + "tasks": [], } + for custom_task in custom_task_objects: + task_data = { + "task_id": custom_task.task_id, + "task_name": custom_task.task_name, + "traceback": custom_task.task_result.traceback, + "progress": custom_task.progress, + "channel_id": custom_task.channel_id, + "status": custom_task.task_result.status, + } + + # Add the task data to the response_payload + response_payload["tasks"].append(task_data) + + return response_payload + def post(self, request): response_payload = { "disallowed": [],