Skip to content

Commit d8a2274

Browse files
committed
(PC-39002)[API] perf: do not excessively stack ubble celery tasks
The previous recovery cron took 170 000 fraud checks and sent them directly to Celery. This causes a lot of tasks to be rate limited and stored in the workers memory, leading to an OOMKill. With the celery feature flag on, the cron will only stack what Celery can realistically do in one hour. The keep the recovery going, the cron should be called every hour.
1 parent f6e4a49 commit d8a2274

File tree

4 files changed

+80
-26
lines changed

4 files changed

+80
-26
lines changed

api/src/pcapi/celery_tasks/tasks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
# These values will prevent retrying tasks too far in the future as this can have negative effects
2020
# because they are loaded by the worker. These can be increased memory consumption and risk of losing tasks.
2121
# (https://docs.celeryq.dev/en/latest/userguide/calling.html#eta-and-countdown)
22-
MAX_TIME_WINDOW_SIZE = 600
23-
MAX_RETRY_DURATION = 1200
22+
MAX_TIME_WINDOW_SIZE = 600 # 10 minutes
23+
MAX_RETRY_DURATION = 1200 # 20 minutes
2424

2525
logger = logging.getLogger(__name__)
2626

api/src/pcapi/core/subscription/ubble/api.py

Lines changed: 67 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545

4646
logger = logging.getLogger(__name__)
4747
PAGE_SIZE = 20_000
48+
CELERY_PAGE_SIZE = 3_000
4849

4950
PENDING_STATUSES = [
5051
ubble_schemas.UbbleIdentificationStatus.PROCESSING,
@@ -75,10 +76,12 @@ def update_ubble_workflow_with_status(
7576
"""
7677
if status in PENDING_STATUSES:
7778
fraud_check.status = subscription_models.FraudCheckStatus.PENDING
79+
fraud_check.updatedAt = datetime.datetime.now(datetime.timezone.utc)
7880
return
7981

8082
if status in CANCELED_STATUSES:
8183
fraud_check.status = subscription_models.FraudCheckStatus.CANCELED
84+
fraud_check.updatedAt = datetime.datetime.now(datetime.timezone.utc)
8285
return
8386

8487
if status not in CONCLUSIVE_STATUSES:
@@ -97,10 +100,12 @@ def update_ubble_workflow(fraud_check: subscription_models.BeneficiaryFraudCheck
97100
status = content.status
98101
if status in PENDING_STATUSES:
99102
fraud_check.status = subscription_models.FraudCheckStatus.PENDING
103+
fraud_check.updatedAt = datetime.datetime.now(datetime.timezone.utc)
100104
return
101105

102106
if status in CANCELED_STATUSES:
103107
fraud_check.status = subscription_models.FraudCheckStatus.CANCELED
108+
fraud_check.updatedAt = datetime.datetime.now(datetime.timezone.utc)
104109
return
105110

106111
if status not in CONCLUSIVE_STATUSES:
@@ -447,30 +452,38 @@ def recover_pending_ubble_applications(dry_run: bool = True) -> None:
447452
"""
448453
from pcapi.core.subscription.ubble import tasks as ubble_tasks
449454

455+
if FeatureToggle.WIP_ASYNCHRONOUS_CELERY_UBBLE.is_active():
456+
stale_ubble_fraud_check_ids = _get_stale_fraud_checks_ids(CELERY_PAGE_SIZE)
457+
for fraud_check_id in stale_ubble_fraud_check_ids:
458+
ubble_tasks.update_ubble_workflow_task.delay(
459+
payload=ubble_schemas.UpdateWorkflowPayload(beneficiary_fraud_check_id=fraud_check_id).model_dump()
460+
)
461+
462+
logger.warning(
463+
"Found %d stale ubble application older than 12 hours and tried to update them.",
464+
len(stale_ubble_fraud_check_ids),
465+
)
466+
return
467+
450468
pending_ubble_application_counter = 0
451-
for pending_ubble_application_fraud_checks in _get_pending_fraud_checks_pages():
452-
pending_ubble_application_counter += len(pending_ubble_application_fraud_checks)
453-
for fraud_check in pending_ubble_application_fraud_checks:
454-
if FeatureToggle.WIP_ASYNCHRONOUS_CELERY_UBBLE.is_active():
455-
ubble_tasks.update_ubble_workflow_task.delay(
456-
payload=ubble_schemas.UpdateWorkflowPayload(beneficiary_fraud_check_id=fraud_check.id).model_dump()
469+
for pending_ubble_fraud_check_ids in _get_pending_fraud_checks_pages():
470+
pending_ubble_application_counter += len(pending_ubble_fraud_check_ids)
471+
for fraud_check in pending_ubble_fraud_check_ids:
472+
try:
473+
with atomic():
474+
update_ubble_workflow(fraud_check)
475+
except Exception as exc:
476+
logger.error(
477+
"Error while updating pending ubble application",
478+
extra={"fraud_check_id": fraud_check.id, "ubble_id": fraud_check.thirdPartyId, "exc": str(exc)},
479+
)
480+
continue
481+
db.session.refresh(fraud_check)
482+
if fraud_check.status == subscription_models.FraudCheckStatus.PENDING:
483+
logger.error(
484+
"Pending ubble application still pending after 12 hours. This is a problem on the Ubble side.",
485+
extra={"fraud_check_id": fraud_check.id, "ubble_id": fraud_check.thirdPartyId},
457486
)
458-
else:
459-
try:
460-
with atomic():
461-
update_ubble_workflow(fraud_check)
462-
except Exception as exc:
463-
logger.error(
464-
"Error while updating pending ubble application",
465-
extra={"fraud_check_id": fraud_check.id, "ubble_id": fraud_check.thirdPartyId, "exc": str(exc)},
466-
)
467-
continue
468-
db.session.refresh(fraud_check)
469-
if fraud_check.status == subscription_models.FraudCheckStatus.PENDING:
470-
logger.error(
471-
"Pending ubble application still pending after 12 hours. This is a problem on the Ubble side.",
472-
extra={"fraud_check_id": fraud_check.id, "ubble_id": fraud_check.thirdPartyId},
473-
)
474487

475488
if pending_ubble_application_counter > 0:
476489
logger.warning(
@@ -481,6 +494,38 @@ def recover_pending_ubble_applications(dry_run: bool = True) -> None:
481494
logger.info("No pending ubble application found older than 12 hours. This is good.")
482495

483496

497+
def _get_stale_fraud_checks_ids(page_size: int) -> typing.Sequence[int]:
498+
"""
499+
Returns the `page_size` first stale and pending fraud checks.
500+
This function only returns the first page, and is meant to be called every hour by recovery workers.
501+
"""
502+
from pcapi.celery_tasks.tasks import MAX_RETRY_DURATION
503+
from pcapi.core.subscription.ubble import tasks as ubble_tasks
504+
505+
# Celery workers keep all rate limited tasks in memory so we need to limit task stacking lest the workers get OOMKilled
506+
page_size_limit = ubble_tasks.UBBLE_TASK_RATE_LIMIT * (MAX_RETRY_DURATION / 60) # ~ 3500 elements
507+
if page_size > page_size_limit:
508+
raise ValueError(f"{page_size = } is above {page_size_limit = }")
509+
510+
# Ubble guarantees an application is processed after 3 hours.
511+
# We give ourselves some extra time and we retrieve the applications that are still pending after 12 hours.
512+
twelve_hours_ago = datetime.date.today() - datetime.timedelta(hours=12)
513+
stale_fraud_check_ids_stmt = (
514+
sa.select(subscription_models.BeneficiaryFraudCheck.id)
515+
.filter(
516+
subscription_models.BeneficiaryFraudCheck.type == subscription_models.FraudCheckType.UBBLE,
517+
subscription_models.BeneficiaryFraudCheck.status.in_(
518+
[subscription_models.FraudCheckStatus.STARTED, subscription_models.FraudCheckStatus.PENDING]
519+
),
520+
subscription_models.BeneficiaryFraudCheck.updatedAt < twelve_hours_ago,
521+
)
522+
.order_by(subscription_models.BeneficiaryFraudCheck.id)
523+
.limit(page_size)
524+
)
525+
526+
return db.session.scalars(stale_fraud_check_ids_stmt).all()
527+
528+
484529
def _get_pending_fraud_checks_pages() -> typing.Generator[list[subscription_models.BeneficiaryFraudCheck], None, None]:
485530
# Ubble guarantees an application is processed after 3 hours.
486531
# We give ourselves some extra time and we retrieve the applications that are still pending after 12 hours.

api/src/pcapi/core/subscription/ubble/tasks.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import datetime
12
import logging
23

34
import sqlalchemy as sa
@@ -60,10 +61,12 @@ def update_ubble_workflow_if_needed(
6061
"""
6162
if status in ubble_api.PENDING_STATUSES:
6263
fraud_check.status = subscription_models.FraudCheckStatus.PENDING
64+
fraud_check.updatedAt = datetime.datetime.now(datetime.timezone.utc)
6365
return
6466

6567
if status in ubble_api.CANCELED_STATUSES:
6668
fraud_check.status = subscription_models.FraudCheckStatus.CANCELED
69+
fraud_check.updatedAt = datetime.datetime.now(datetime.timezone.utc)
6770
return
6871

6972
if status not in ubble_api.CONCLUSIVE_STATUSES:

api/tests/core/subscription/ubble/test_api.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1474,10 +1474,16 @@ def test_not_eligible(self):
14741474
def test_pending_and_created_fraud_checks_are_updated(update_ubble_workflow_mock):
14751475
yesterday = datetime.date.today() - relativedelta(hours=13)
14761476
created_fraud_check = BeneficiaryFraudCheckFactory(
1477-
type=FraudCheckType.UBBLE, status=FraudCheckStatus.STARTED, dateCreated=yesterday
1477+
type=FraudCheckType.UBBLE,
1478+
status=FraudCheckStatus.STARTED,
1479+
dateCreated=yesterday,
1480+
updatedAt=yesterday,
14781481
)
14791482
pending_fraud_check = BeneficiaryFraudCheckFactory(
1480-
type=FraudCheckType.UBBLE, status=FraudCheckStatus.PENDING, dateCreated=yesterday
1483+
type=FraudCheckType.UBBLE,
1484+
status=FraudCheckStatus.PENDING,
1485+
dateCreated=yesterday,
1486+
updatedAt=yesterday,
14811487
)
14821488

14831489
ubble_subscription_api.recover_pending_ubble_applications()

0 commit comments

Comments
 (0)