Skip to content

fix: atomic claim for LlmQueue non-capture processing (#1190)#1200

Open
Chris0Jeky wants to merge 2 commits into
mainfrom
fix/1190-queue-claim-race
Open

fix: atomic claim for LlmQueue non-capture processing (#1190)#1200
Chris0Jeky wants to merge 2 commits into
mainfrom
fix/1190-queue-claim-race

Conversation

@Chris0Jeky
Copy link
Copy Markdown
Owner

Summary

Fixes #1190 -- ProcessNextRequestAsync and LlmQueueToProposalWorker.ProcessSingleItemAsync used a non-atomic fetch-then-mutate pattern for claiming pending LLM requests. Under concurrent workers, the same request could be claimed twice, producing duplicate proposals.

  • Add TryClaimProcessingAsync to ILlmQueueRepository / LlmQueueRepository that atomically transitions Pending -> Processing with an optimistic concurrency guard (WHERE Status = Pending AND UpdatedAt = @expected), mirroring the existing TryClaimProcessingCaptureAsync pattern
  • Update LlmQueueService.ProcessNextRequestAsync to use the atomic claim, iterating FIFO candidates and skipping any that fail the claim
  • Update LlmQueueToProposalWorker.ProcessSingleItemAsync to use TryClaimProcessingAsync instead of the racy GetByIdAsync + MarkAsProcessing + SaveChangesAsync flow
  • Pass ExpectedUpdatedAt for non-capture batch items in BuildFairBatchItems

Test plan

  • 6 new unit tests in LlmQueueServiceTests: claim success, claim failure (concurrent), fallthrough to next candidate, skip capture requests, empty queue, FIFO ordering
  • 4 new integration tests in LlmQueueRepositoryIntegrationTests: claim pending request, fail when status already changed, concurrent race (exactly one wins), reject non-pending request
  • Updated ProcessBatch_ItemClaimedBetweenFetchAndProcess_SkipsGracefully worker test to use atomic claim pattern
  • All 3,297 Application.Tests pass
  • All 1,744 Api.Tests pass
  • Build clean (0 errors)

ProcessNextRequestAsync and LlmQueueToProposalWorker.ProcessSingleItemAsync
used a non-atomic fetch-then-mutate pattern for claiming pending LLM requests.
Under concurrent workers, the same request could be claimed twice, producing
duplicate proposals.

Add TryClaimProcessingAsync to ILlmQueueRepository / LlmQueueRepository that
atomically transitions Pending -> Processing with an optimistic concurrency
guard (WHERE Status = Pending AND UpdatedAt = @expected), mirroring the
existing TryClaimProcessingCaptureAsync pattern.

Update both ProcessNextRequestAsync and ProcessSingleItemAsync to use the
atomic claim instead of the racy read-then-MarkAsProcessing-then-save flow.
ProcessNextRequestAsync now iterates candidates and skips any that fail the
atomic claim, falling through to the next FIFO candidate.

Tests: 6 new unit tests (service claim success, claim failure, fallthrough to
next candidate, skip capture requests, empty queue, FIFO ordering) + 4 new
integration tests (claim pending, fail when stale, concurrent race exactly-one,
reject non-pending). All 3,297 Application.Tests + 1,744 Api.Tests green.
@Chris0Jeky
Copy link
Copy Markdown
Owner Author

Adversarial Code Review

CRITICAL

  • None

HIGH

  • None

MEDIUM

  • M1: Orphaned Processing item on null re-fetch (LlmQueueService.cs line ~131): After TryClaimProcessingAsync succeeds, if GetByIdAsync returns null the code does continue to the next candidate. The already-claimed item is now stuck in Processing with no one to process or reset it. While extremely unlikely (requires a DELETE between the UPDATE and SELECT), the defensive continue silently orphans the item. Should log a warning. The worker path has the same pattern but is pre-existing; the proposal housekeeping worker handles stuck items. Adding a warning log on this edge case is sufficient.
  • M2: Snake_case variable name (LlmQueueService.cs line ~130): claimed_request uses snake_case, violating the C# camelCase convention established in the codebase. Should be claimedRequest.

LOW

  • L1: Misleading test name (LlmQueueServiceTests.cs): ProcessNextRequestAsync_ShouldReturnConflict_WhenClaimFails — the name says "Conflict" but the assertion correctly checks for ErrorCodes.NotFound. The name should reflect the actual behavior (all claims exhausted = NotFound).

Bot Comments Addressed

  • None (no bot comments at this time; CI still pending)

Summary

0 CRITICAL, 0 HIGH, 2 MEDIUM, 1 LOW. No merge blockers. All findings are fixable with minimal changes.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces optimistic concurrency for claiming pending non-capture requests in the LLM queue by adding and implementing TryClaimProcessingAsync using raw SQL updates. It updates the background worker and queue service to use this atomic claim mechanism and adds corresponding integration and unit tests. The review feedback identifies a critical bug in LlmQueueService.ProcessNextRequestAsync where re-fetching the claimed request via GetByIdAsync returns a stale, tracked in-memory entity from EF Core's cache instead of the updated database state. The reviewer suggests directly updating the tracked entity's state in-memory using candidate.MarkAsProcessing() to avoid an extra database roundtrip and ensure the correct status is returned.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +131 to +136
// Re-fetch so the in-memory entity reflects the DB state set by the atomic UPDATE.
var claimed_request = await _unitOfWork.LlmQueue.GetByIdAsync(candidate.Id);
if (claimed_request == null)
continue;

return Result.Success(MapToDto(claimed_request));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In EF Core, when entities are queried (such as via GetByStatusAsync at line 118), they are tracked by the DbContext in their current state (Pending). When TryClaimProcessingAsync executes a raw SQL UPDATE statement, it bypasses the EF Core change tracker.

Because of this, calling GetByIdAsync subsequently will return the already-tracked candidate instance from the EF Core identity map/local cache, which still has Status = Pending and the old UpdatedAt timestamp. It will not reflect the updated database state. This causes ProcessNextRequestAsync to return a DTO with Status = Pending, which is a bug.

Instead of performing a redundant and stale database query, you can simply transition the tracked candidate in-memory state using candidate.MarkAsProcessing(). This updates the tracked entity's status to Processing and touches its UpdatedAt timestamp, keeping the in-memory state in sync with the database without an extra database roundtrip.

                candidate.MarkAsProcessing();
                return Result.Success(MapToDto(candidate));

M1: Document orphaned-Processing edge case on null re-fetch after
successful TryClaimProcessingAsync claim with explanatory comment.
M2: Rename snake_case `claimed_request` to camelCase `claimedRequest`.
L1: Rename misleading test from ShouldReturnConflict to
ShouldReturnNotFound_WhenAllClaimsFail.
@Chris0Jeky
Copy link
Copy Markdown
Owner Author

Adversarial Review -- Fixes Applied

Finding Severity Fix Commit Verified
M1: Orphaned Processing item on null re-fetch MEDIUM `98cdd427` Documented with explanatory comment; housekeeping worker mitigates
M2: Snake_case `claimed_request` variable MEDIUM `98cdd427` Renamed to `claimedRequest`; build clean
L1: Misleading test name (Conflict vs NotFound) LOW `98cdd427` Renamed to `ShouldReturnNotFound_WhenAllClaimsFail`; 20/20 tests pass

All findings addressed. CI status: PENDING (new run triggered by fix push).

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 0f7eb7a451

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".


return Result.Success(MapToDto(request));
// Re-fetch so the in-memory entity reflects the DB state set by the atomic UPDATE.
var claimed_request = await _unitOfWork.LlmQueue.GetByIdAsync(candidate.Id);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Refresh the tracked request after claiming it

When ProcessNextRequestAsync uses the EF repository, GetByStatusAsync materializes the pending candidates in the same DbContext; after TryClaimProcessingAsync updates the row with raw SQL, GetByIdAsync delegates to FindAsync, which returns that already-tracked stale entity rather than the database row. As a result, a successful POST /api/llm-queue/process-next can return a DTO whose status is still Pending even though the row has been atomically claimed as Processing, regressing the API contract the updated tests expect. Reload/detach the candidate or query no-tracking before mapping the claimed request.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: Pending

Development

Successfully merging this pull request may close these issues.

LlmQueueService.ProcessNextRequestAsync race condition with worker (non-atomic claim)

1 participant