Skip to content

Commit 570d0d7

Browse files
committed
(PC-38926)[API] fix: remove batching from Ubble async task
Ubble async task has its own rate limiting queue that should be a bit smaller than the Ubble rate limiting queue. To make sure that the global Ubble rate limiting queue does not overflow, then the Ubble async task should treat ids one by one instead of by batches.
1 parent 57fd5c0 commit 570d0d7

File tree

5 files changed

+29
-36
lines changed

5 files changed

+29
-36
lines changed

api/src/pcapi/core/subscription/api.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,7 @@ def _update_fraud_check_eligibility_with_history(
632632
eligibilityType=eligibility,
633633
)
634634
db.session.add(new_fraud_check)
635+
db.session.flush()
635636

636637
# Cancel the old fraud check
637638
fraud_check.status = subscription_models.FraudCheckStatus.CANCELED

api/src/pcapi/core/subscription/ubble/api.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,8 @@ def start_ubble_workflow(
188188
content = _reattempt_identity_verification(ubble_fraud_check, first_name, last_name, redirect_url, webhook_url)
189189
except ubble.UbbleConflictError:
190190
# resync the identity verification
191-
ubble_tasks.update_ubble_workflow_task(
192-
ubble_schemas.UpdateWorkflowPayload(beneficiary_fraud_check_ids=[ubble_fraud_check.id])
191+
ubble_tasks.update_ubble_workflow_task.delay(
192+
payload=ubble_schemas.UpdateWorkflowPayload(beneficiary_fraud_check_id=ubble_fraud_check.id).model_dump()
193193
)
194194

195195
raise
@@ -450,16 +450,12 @@ def recover_pending_ubble_applications(dry_run: bool = True) -> None:
450450
pending_ubble_application_counter = 0
451451
for pending_ubble_application_fraud_checks in _get_pending_fraud_checks_pages():
452452
pending_ubble_application_counter += len(pending_ubble_application_fraud_checks)
453-
if FeatureToggle.WIP_ASYNCHRONOUS_CELERY_UBBLE.is_active():
454-
ubble_tasks.update_ubble_workflow_task(
455-
ubble_schemas.UpdateWorkflowPayload(
456-
beneficiary_fraud_check_ids=[
457-
fraud_check.id for fraud_check in pending_ubble_application_fraud_checks
458-
]
453+
for fraud_check in pending_ubble_application_fraud_checks:
454+
if FeatureToggle.WIP_ASYNCHRONOUS_CELERY_UBBLE.is_active():
455+
ubble_tasks.update_ubble_workflow_task.delay(
456+
payload=ubble_schemas.UpdateWorkflowPayload(beneficiary_fraud_check_id=fraud_check.id).model_dump()
459457
)
460-
)
461-
else:
462-
for fraud_check in pending_ubble_application_fraud_checks:
458+
else:
463459
try:
464460
with atomic():
465461
update_ubble_workflow(fraud_check)

api/src/pcapi/core/subscription/ubble/schemas.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,4 +129,4 @@ def get_id_piece_number(self) -> str | None:
129129

130130

131131
class UpdateWorkflowPayload(BaseModelV2):
132-
beneficiary_fraud_check_ids: list[int]
132+
beneficiary_fraud_check_id: int

api/src/pcapi/core/subscription/ubble/tasks.py

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
logger = logging.getLogger(__name__)
1818

1919
# always leave some bandwidth for manual Ubble call through native app
20-
UBBLE_TASK_RATE_LIMIT = int(settings.UBBLE_RATE_LIMIT * 0.8)
20+
UBBLE_TASK_RATE_LIMIT = int(settings.UBBLE_RATE_LIMIT * 0.9)
2121

2222

2323
@celery_async_task(
@@ -30,31 +30,23 @@
3030
def update_ubble_workflow_task(payload: ubble_schemas.UpdateWorkflowPayload) -> None:
3131
fraud_check_stmt = (
3232
sa.select(subscription_models.BeneficiaryFraudCheck)
33-
.where(subscription_models.BeneficiaryFraudCheck.id.in_(payload.beneficiary_fraud_check_ids))
33+
.where(subscription_models.BeneficiaryFraudCheck.id == payload.beneficiary_fraud_check_id)
3434
.options(
3535
sa.orm.joinedload(subscription_models.BeneficiaryFraudCheck.user)
3636
.selectinload(users_models.User.deposits)
3737
.selectinload(finance_models.Deposit.recredits)
3838
)
3939
)
40-
fraud_checks_to_update = db.session.scalars(fraud_check_stmt).all()
40+
fraud_check = db.session.scalars(fraud_check_stmt).one()
4141

42-
for fraud_check in fraud_checks_to_update:
43-
try:
44-
with atomic():
45-
ubble_api.update_ubble_workflow(fraud_check)
46-
except Exception as e:
47-
logger.error(
48-
"Error while updating pending ubble application",
49-
extra={"fraud_check_id": fraud_check.id, "ubble_id": fraud_check.thirdPartyId, "exc": str(e)},
50-
)
51-
continue
42+
with atomic():
43+
ubble_api.update_ubble_workflow(fraud_check)
5244

53-
if fraud_check.status == subscription_models.FraudCheckStatus.PENDING:
54-
logger.error(
55-
"Pending ubble application still pending after 12 hours. This is a problem on the Ubble side.",
56-
extra={"fraud_check_id": fraud_check.id, "ubble_id": fraud_check.thirdPartyId},
57-
)
45+
if fraud_check.status == subscription_models.FraudCheckStatus.PENDING:
46+
logger.error(
47+
"Pending ubble application still pending after 12 hours. This is a problem on the Ubble side.",
48+
extra={"fraud_check_id": fraud_check.id, "ubble_id": fraud_check.thirdPartyId},
49+
)
5850

5951

6052
def update_ubble_workflow_if_needed(
@@ -68,16 +60,14 @@ def update_ubble_workflow_if_needed(
6860
"""
6961
if status in ubble_api.PENDING_STATUSES:
7062
fraud_check.status = subscription_models.FraudCheckStatus.PENDING
71-
db.session.commit()
7263
return
7364

7465
if status in ubble_api.CANCELED_STATUSES:
7566
fraud_check.status = subscription_models.FraudCheckStatus.CANCELED
76-
db.session.commit()
7767
return
7868

7969
if status not in ubble_api.CONCLUSIVE_STATUSES:
8070
return
8171

82-
payload = ubble_schemas.UpdateWorkflowPayload(beneficiary_fraud_check_ids=[fraud_check.id])
83-
update_ubble_workflow_task.delay(payload.dict())
72+
payload = ubble_schemas.UpdateWorkflowPayload(beneficiary_fraud_check_id=fraud_check.id)
73+
update_ubble_workflow_task.delay(payload.model_dump())

api/tests/core/subscription/ubble/test_api.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1469,7 +1469,7 @@ def test_not_eligible(self):
14691469

14701470

14711471
@pytest.mark.usefixtures("db_session")
1472-
@patch("pcapi.core.subscription.ubble.api.update_ubble_workflow")
1472+
@patch("pcapi.core.subscription.ubble.tasks.update_ubble_workflow_task.delay")
14731473
def test_pending_and_created_fraud_checks_are_updated(update_ubble_workflow_mock):
14741474
yesterday = datetime.date.today() - relativedelta(hours=13)
14751475
created_fraud_check = BeneficiaryFraudCheckFactory(
@@ -1481,4 +1481,10 @@ def test_pending_and_created_fraud_checks_are_updated(update_ubble_workflow_mock
14811481

14821482
ubble_subscription_api.recover_pending_ubble_applications()
14831483

1484-
update_ubble_workflow_mock.assert_has_calls([call(created_fraud_check), call(pending_fraud_check)], any_order=True)
1484+
update_ubble_workflow_mock.assert_has_calls(
1485+
[
1486+
call(payload={"beneficiary_fraud_check_id": created_fraud_check.id}),
1487+
call(payload={"beneficiary_fraud_check_id": pending_fraud_check.id}),
1488+
],
1489+
any_order=True,
1490+
)

0 commit comments

Comments
 (0)