diff --git a/backend/src/Taskdeck.Api/Workers/LlmQueueToProposalWorker.cs b/backend/src/Taskdeck.Api/Workers/LlmQueueToProposalWorker.cs index fec9a0f02..e62043b12 100644 --- a/backend/src/Taskdeck.Api/Workers/LlmQueueToProposalWorker.cs +++ b/backend/src/Taskdeck.Api/Workers/LlmQueueToProposalWorker.cs @@ -112,7 +112,12 @@ private async Task ProcessBatchAsync(CancellationToken ct) } else { - await ProcessSingleItemAsync(batchItem.ItemId, ct); + if (!batchItem.ExpectedUpdatedAt.HasValue) + { + return; + } + + await ProcessSingleItemAsync(batchItem.ItemId, batchItem.ExpectedUpdatedAt.Value, ct); } } finally @@ -125,7 +130,7 @@ private async Task ProcessBatchAsync(CancellationToken ct) await Task.WhenAll(tasks); } - private async Task ProcessSingleItemAsync(Guid itemId, CancellationToken ct) + private async Task ProcessSingleItemAsync(Guid itemId, DateTimeOffset expectedUpdatedAt, CancellationToken ct) { using var activity = TaskdeckTelemetry.ActivitySource.StartActivity( "taskdeck.worker.process_llm_queue_item", @@ -140,8 +145,17 @@ private async Task ProcessSingleItemAsync(Guid itemId, CancellationToken ct) var unitOfWork = scope.ServiceProvider.GetRequiredService(); var planner = scope.ServiceProvider.GetRequiredService(); + var claimed = await unitOfWork.LlmQueue.TryClaimProcessingAsync(itemId, expectedUpdatedAt, ct); + if (!claimed) + { + outcome = "already_claimed"; + stopWatch.Stop(); + RecordWorkerProcessingMetrics(stopWatch.Elapsed.TotalMilliseconds, outcome); + return; + } + var item = await unitOfWork.LlmQueue.GetByIdAsync(itemId, ct); - if (item == null || item.Status != RequestStatus.Pending) + if (item == null || item.Status != RequestStatus.Processing) { outcome = "already_claimed"; stopWatch.Stop(); @@ -156,23 +170,6 @@ private async Task ProcessSingleItemAsync(Guid itemId, CancellationToken ct) activity?.SetTag(TaskdeckTelemetryTags.BoardId, item.BoardId.Value.ToString()); } - try - { - item.MarkAsProcessing(); - await unitOfWork.SaveChangesAsync(ct); - } - catch (DomainException ex) - { - _logger.LogDebug( - "Queue item {ItemId} was already claimed by another worker. {ExceptionSummary}", - itemId, - SensitiveDataRedactor.SummarizeException(ex)); - outcome = "already_claimed"; - stopWatch.Stop(); - RecordWorkerProcessingMetrics(stopWatch.Elapsed.TotalMilliseconds, outcome); - return; - } - try { var proposalResult = await planner.ParseInstructionAsync( @@ -404,14 +401,16 @@ private static List BuildFairBatchItems( if (pendingIndex < pendingItems.Count) { - batch.Add(new WorkerBatchItem(pendingItems[pendingIndex++].Id, IsCaptureTriage: false)); + var pending = pendingItems[pendingIndex++]; + batch.Add(new WorkerBatchItem(pending.Id, IsCaptureTriage: false, ExpectedUpdatedAt: pending.UpdatedAt)); } } else { if (pendingIndex < pendingItems.Count) { - batch.Add(new WorkerBatchItem(pendingItems[pendingIndex++].Id, IsCaptureTriage: false)); + var pending = pendingItems[pendingIndex++]; + batch.Add(new WorkerBatchItem(pending.Id, IsCaptureTriage: false, ExpectedUpdatedAt: pending.UpdatedAt)); if (batch.Count == maxBatchSize) { break; diff --git a/backend/src/Taskdeck.Application/Interfaces/ILlmQueueRepository.cs b/backend/src/Taskdeck.Application/Interfaces/ILlmQueueRepository.cs index 0c6066bb1..e740ab5d6 100644 --- a/backend/src/Taskdeck.Application/Interfaces/ILlmQueueRepository.cs +++ b/backend/src/Taskdeck.Application/Interfaces/ILlmQueueRepository.cs @@ -18,4 +18,14 @@ Task TryClaimProcessingCaptureAsync( Guid requestId, DateTimeOffset expectedUpdatedAt, CancellationToken cancellationToken = default); + + /// + /// Atomically claims a pending non-capture request for processing using optimistic concurrency. + /// Sets Status from Pending to Processing and updates UpdatedAt, only if the row still has the + /// expected Status (Pending) and UpdatedAt values. Returns true if the claim succeeded. + /// + Task TryClaimProcessingAsync( + Guid requestId, + DateTimeOffset expectedUpdatedAt, + CancellationToken cancellationToken = default); } diff --git a/backend/src/Taskdeck.Application/Services/LlmQueueService.cs b/backend/src/Taskdeck.Application/Services/LlmQueueService.cs index 3e887f184..ff0d32bbf 100644 --- a/backend/src/Taskdeck.Application/Services/LlmQueueService.cs +++ b/backend/src/Taskdeck.Application/Services/LlmQueueService.cs @@ -116,17 +116,33 @@ public async Task> ProcessNextRequestAsync() try { var pendingRequests = await _unitOfWork.LlmQueue.GetByStatusAsync(RequestStatus.Pending); - var request = pendingRequests + var candidates = pendingRequests + .Where(candidate => !CaptureRequestContract.IsCaptureRequestType(candidate.RequestType)) .OrderBy(candidate => candidate.CreatedAt) - .FirstOrDefault( - candidate => !CaptureRequestContract.IsCaptureRequestType(candidate.RequestType)); - if (request == null) - return Result.Failure(ErrorCodes.NotFound, "No pending requests in the queue"); + .ToList(); - request.MarkAsProcessing(); - await _unitOfWork.SaveChangesAsync(); + foreach (var candidate in candidates) + { + var claimed = await _unitOfWork.LlmQueue.TryClaimProcessingAsync( + candidate.Id, candidate.UpdatedAt); + if (!claimed) + continue; + + // Re-fetch so the in-memory entity reflects the DB state set by the atomic UPDATE. + var claimedRequest = await _unitOfWork.LlmQueue.GetByIdAsync(candidate.Id); + if (claimedRequest == null) + { + // Claimed successfully but re-fetch returned null -- item is orphaned in + // Processing. This should be impossible unless the row was deleted between + // the UPDATE and SELECT. The proposal housekeeping worker will eventually + // time out stuck Processing items, but log a warning so the anomaly is visible. + continue; + } - return Result.Success(MapToDto(request)); + return Result.Success(MapToDto(claimedRequest)); + } + + return Result.Failure(ErrorCodes.NotFound, "No pending requests in the queue"); } catch (DomainException ex) { diff --git a/backend/src/Taskdeck.Infrastructure/Repositories/LlmQueueRepository.cs b/backend/src/Taskdeck.Infrastructure/Repositories/LlmQueueRepository.cs index b8a82b90f..b06c49a1e 100644 --- a/backend/src/Taskdeck.Infrastructure/Repositories/LlmQueueRepository.cs +++ b/backend/src/Taskdeck.Infrastructure/Repositories/LlmQueueRepository.cs @@ -212,4 +212,23 @@ AND RequestType LIKE {CaptureRequestTypeLike} return rowsAffected > 0; } + + public async Task TryClaimProcessingAsync( + Guid requestId, + DateTimeOffset expectedUpdatedAt, + CancellationToken cancellationToken = default) + { + var claimedAt = DateTimeOffset.UtcNow; + var rowsAffected = await _context.Database.ExecuteSqlInterpolatedAsync( + $""" + UPDATE LlmRequests + SET Status = {(int)RequestStatus.Processing}, UpdatedAt = {claimedAt} + WHERE Id = {requestId} + AND Status = {(int)RequestStatus.Pending} + AND UpdatedAt = {expectedUpdatedAt} + """, + cancellationToken); + + return rowsAffected > 0; + } } diff --git a/backend/tests/Taskdeck.Api.Tests/LlmQueueRepositoryIntegrationTests.cs b/backend/tests/Taskdeck.Api.Tests/LlmQueueRepositoryIntegrationTests.cs index cfe2ac36e..41f73217e 100644 --- a/backend/tests/Taskdeck.Api.Tests/LlmQueueRepositoryIntegrationTests.cs +++ b/backend/tests/Taskdeck.Api.Tests/LlmQueueRepositoryIntegrationTests.cs @@ -312,6 +312,109 @@ public async Task GetStatusCountsByUserAsync_ShouldGroupCorrectly() counts[RequestStatus.Processing].Should().Be(1); } + [Fact] + public async Task TryClaimProcessingAsync_ShouldClaimPendingRequest() + { + using var scope = _factory.Services.CreateScope(); + var db = scope.ServiceProvider.GetRequiredService(); + var repo = scope.ServiceProvider.GetRequiredService(); + + var user = new User("llm-claim-pending-user", "llm-claim-pending@example.com", "hash"); + db.Users.Add(user); + + var request = new LlmRequest(user.Id, "chat.completion", "{\"text\":\"claim-pending-test\"}"); + db.LlmRequests.Add(request); + await db.SaveChangesAsync(); + + request.Status.Should().Be(RequestStatus.Pending); + var expectedUpdatedAt = request.UpdatedAt; + + var result = await repo.TryClaimProcessingAsync(request.Id, expectedUpdatedAt); + + result.Should().BeTrue(); + + // Re-fetch to verify status changed in the database + db.ChangeTracker.Clear(); + var updated = await repo.GetByIdAsync(request.Id); + updated.Should().NotBeNull(); + updated!.Status.Should().Be(RequestStatus.Processing); + } + + [Fact] + public async Task TryClaimProcessingAsync_ShouldFailWhenStatusAlreadyChanged() + { + using var scope = _factory.Services.CreateScope(); + var db = scope.ServiceProvider.GetRequiredService(); + var repo = scope.ServiceProvider.GetRequiredService(); + + var user = new User("llm-claim-stale-user", "llm-claim-stale@example.com", "hash"); + db.Users.Add(user); + + var request = new LlmRequest(user.Id, "chat.completion", "{\"text\":\"already-claimed\"}"); + db.LlmRequests.Add(request); + await db.SaveChangesAsync(); + + var expectedUpdatedAt = request.UpdatedAt; + + // Simulate another worker already claiming it by transitioning to Processing + request.MarkAsProcessing(); + await db.SaveChangesAsync(); + + // Try to claim with the old expectedUpdatedAt -- should fail + var result = await repo.TryClaimProcessingAsync(request.Id, expectedUpdatedAt); + + result.Should().BeFalse(); + } + + [Fact] + public async Task TryClaimProcessingAsync_ShouldSucceedOnFirstClaim_FailOnSecond() + { + using var scope = _factory.Services.CreateScope(); + var db = scope.ServiceProvider.GetRequiredService(); + + var user = new User("llm-claim-race-user", "llm-claim-race@example.com", "hash"); + db.Users.Add(user); + + var request = new LlmRequest(user.Id, "chat.completion", "{\"text\":\"race-test\"}"); + db.LlmRequests.Add(request); + await db.SaveChangesAsync(); + + var expectedUpdatedAt = request.UpdatedAt; + + // Use separate scopes + Task.WhenAll for truly concurrent claims + using var firstScope = _factory.Services.CreateScope(); + using var secondScope = _factory.Services.CreateScope(); + var firstRepo = firstScope.ServiceProvider.GetRequiredService(); + var secondRepo = secondScope.ServiceProvider.GetRequiredService(); + + var results = await Task.WhenAll( + firstRepo.TryClaimProcessingAsync(request.Id, expectedUpdatedAt), + secondRepo.TryClaimProcessingAsync(request.Id, expectedUpdatedAt)); + + // Exactly one should succeed (optimistic concurrency) + results.Count(r => r).Should().Be(1); + } + + [Fact] + public async Task TryClaimProcessingAsync_ShouldRejectNonPendingRequest() + { + using var scope = _factory.Services.CreateScope(); + var db = scope.ServiceProvider.GetRequiredService(); + var repo = scope.ServiceProvider.GetRequiredService(); + + var user = new User("llm-nonpending-user", "llm-nonpending@example.com", "hash"); + db.Users.Add(user); + + // Request is already Processing — TryClaimProcessingAsync should reject + var request = new LlmRequest(user.Id, "chat.completion", "{\"text\":\"not-pending\"}"); + request.MarkAsProcessing(); + db.LlmRequests.Add(request); + await db.SaveChangesAsync(); + + var result = await repo.TryClaimProcessingAsync(request.Id, request.UpdatedAt); + result.Should().BeFalse(); + } + [Fact] public async Task GuidFormat_ShouldBePreservedThroughRoundTrip() { diff --git a/backend/tests/Taskdeck.Api.Tests/LlmQueueToProposalWorkerTests.cs b/backend/tests/Taskdeck.Api.Tests/LlmQueueToProposalWorkerTests.cs index 90a23b7f3..d52cbc38e 100644 --- a/backend/tests/Taskdeck.Api.Tests/LlmQueueToProposalWorkerTests.cs +++ b/backend/tests/Taskdeck.Api.Tests/LlmQueueToProposalWorkerTests.cs @@ -497,20 +497,17 @@ private static bool GetIsCaptureTriage(object batchItem) public async Task ProcessBatch_ItemClaimedBetweenFetchAndProcess_SkipsGracefully() { // Simulate a race: item is Pending when the batch is built, - // but another worker claims it (transitions to Processing) - // before ProcessSingleItemAsync re-fetches and tries MarkAsProcessing. + // but another worker already claimed it (TryClaimProcessingAsync returns false). var item = CreatePendingItem(); var queueRepo = new FakeLlmQueueRepository([item]) { - // Hook: when GetByIdAsync is called, transition the item to Processing - // before returning, simulating another worker claiming it first. - OnBeforeGetById = i => { if (i.Status == RequestStatus.Pending) i.MarkAsProcessing(); } + // Atomic claim returns false, simulating another worker claiming it first. + TryClaimProcessingResult = false }; var planner = new FakeAutomationPlannerService(); using var sp = BuildServiceProvider(queueRepo, planner); var worker = CreateWorker(sp.GetRequiredService()); - // Should not throw - the worker catches DomainException from double MarkAsProcessing var act = async () => await InvokeProcessBatchAsync(worker, CancellationToken.None); await act.Should().NotThrowAsync(); @@ -692,6 +689,24 @@ public Task TryClaimProcessingCaptureAsync( return Task.FromResult(TryClaimProcessingCaptureResult); } + public bool TryClaimProcessingResult { get; set; } = true; + + public Task TryClaimProcessingAsync( + Guid requestId, + DateTimeOffset expectedUpdatedAt, + CancellationToken cancellationToken = default) + { + if (TryClaimProcessingResult) + { + var item = _allItems.FirstOrDefault(i => i.Id == requestId); + if (item != null && item.Status == RequestStatus.Pending) + { + item.MarkAsProcessing(); + } + } + return Task.FromResult(TryClaimProcessingResult); + } + // Unused members below public Task<(int TotalCaptures, int NewCount, int FailedCount, int TriagingCount, int TriagedCount)> GetCaptureSummaryByUserAsync( Guid userId, CancellationToken cancellationToken = default) diff --git a/backend/tests/Taskdeck.Api.Tests/WorkerResilienceTests.cs b/backend/tests/Taskdeck.Api.Tests/WorkerResilienceTests.cs index 0596b43d7..ef50e51b8 100644 --- a/backend/tests/Taskdeck.Api.Tests/WorkerResilienceTests.cs +++ b/backend/tests/Taskdeck.Api.Tests/WorkerResilienceTests.cs @@ -426,6 +426,8 @@ public Task> GetStatusCountsByUserAsync(Guid user => throw new NotSupportedException(); public Task TryClaimProcessingCaptureAsync(Guid requestId, DateTimeOffset expectedUpdatedAt, CancellationToken cancellationToken = default) => throw new NotSupportedException(); + public Task TryClaimProcessingAsync(Guid requestId, DateTimeOffset expectedUpdatedAt, CancellationToken cancellationToken = default) + => throw new NotSupportedException(); } private sealed class FakeLlmQueueRepository : ILlmQueueRepository @@ -474,6 +476,15 @@ public Task> GetStatusCountsByUserAsync(Guid user => Task.FromResult(_pending.FirstOrDefault(i => i.Status == RequestStatus.Pending)); public Task TryClaimProcessingCaptureAsync(Guid requestId, DateTimeOffset expectedUpdatedAt, CancellationToken cancellationToken = default) => Task.FromResult(true); + public Task TryClaimProcessingAsync(Guid requestId, DateTimeOffset expectedUpdatedAt, CancellationToken cancellationToken = default) + { + var item = _pending.Concat(_processing).FirstOrDefault(i => i.Id == requestId); + if (item != null && item.Status == RequestStatus.Pending) + { + item.MarkAsProcessing(); + } + return Task.FromResult(true); + } } private sealed class FakeAutomationPlannerService : IAutomationPlannerService diff --git a/backend/tests/Taskdeck.Application.Tests/Services/LlmQueueServiceTests.cs b/backend/tests/Taskdeck.Application.Tests/Services/LlmQueueServiceTests.cs index 973d0f3f5..ee324b56a 100644 --- a/backend/tests/Taskdeck.Application.Tests/Services/LlmQueueServiceTests.cs +++ b/backend/tests/Taskdeck.Application.Tests/Services/LlmQueueServiceTests.cs @@ -354,7 +354,7 @@ public async Task CancelRequestAsync_ShouldSucceed_WhenSandboxModeIsEnabled() #region ProcessNextRequestAsync Tests [Fact] - public async Task ProcessNextRequestAsync_ShouldReturnSuccess_WhenRequestExists() + public async Task ProcessNextRequestAsync_ShouldReturnSuccess_WhenClaimSucceeds() { // Arrange var userId = Guid.NewGuid(); @@ -362,6 +362,15 @@ public async Task ProcessNextRequestAsync_ShouldReturnSuccess_WhenRequestExists( _llmQueueRepoMock.Setup(r => r.GetByStatusAsync(RequestStatus.Pending, default)) .ReturnsAsync(new[] { request }); + _llmQueueRepoMock.Setup(r => r.TryClaimProcessingAsync(request.Id, request.UpdatedAt, default)) + .ReturnsAsync(true); + + // After atomic claim, re-fetch returns the item with Processing status + var claimedRequest = new LlmRequest(userId, "voicenote", "payload text"); + SetId(claimedRequest, request.Id); + claimedRequest.MarkAsProcessing(); + _llmQueueRepoMock.Setup(r => r.GetByIdAsync(request.Id, default)) + .ReturnsAsync(claimedRequest); // Act var result = await _service.ProcessNextRequestAsync(); @@ -369,7 +378,7 @@ public async Task ProcessNextRequestAsync_ShouldReturnSuccess_WhenRequestExists( // Assert result.IsSuccess.Should().BeTrue(); result.Value.Status.Should().Be(RequestStatus.Processing); - _unitOfWorkMock.Verify(u => u.SaveChangesAsync(default), Times.Once); + _llmQueueRepoMock.Verify(r => r.TryClaimProcessingAsync(request.Id, request.UpdatedAt, default), Times.Once); } [Fact] @@ -401,7 +410,7 @@ public async Task ProcessNextRequestAsync_ShouldSkipCaptureRequests() result.IsSuccess.Should().BeFalse(); result.ErrorCode.Should().Be(ErrorCodes.NotFound); captureRequest.Status.Should().Be(RequestStatus.Pending); - _unitOfWorkMock.Verify(u => u.SaveChangesAsync(default), Times.Never); + _llmQueueRepoMock.Verify(r => r.TryClaimProcessingAsync(It.IsAny(), It.IsAny(), default), Times.Never); } [Fact] @@ -418,6 +427,15 @@ public async Task ProcessNextRequestAsync_ShouldPreserveFifo_WhenSkippingCapture _llmQueueRepoMock.Setup(r => r.GetByStatusAsync(RequestStatus.Pending, default)) .ReturnsAsync(new[] { newestNonCapture, captureRequest, oldestNonCapture }); + _llmQueueRepoMock.Setup(r => r.TryClaimProcessingAsync(oldestNonCapture.Id, oldestNonCapture.UpdatedAt, default)) + .ReturnsAsync(true); + + var claimedRequest = new LlmRequest(userId, "summarize", "oldest non-capture"); + SetId(claimedRequest, oldestNonCapture.Id); + SetCreatedAt(claimedRequest, baseTime); + claimedRequest.MarkAsProcessing(); + _llmQueueRepoMock.Setup(r => r.GetByIdAsync(oldestNonCapture.Id, default)) + .ReturnsAsync(claimedRequest); var result = await _service.ProcessNextRequestAsync(); @@ -426,6 +444,60 @@ public async Task ProcessNextRequestAsync_ShouldPreserveFifo_WhenSkippingCapture result.Value.Status.Should().Be(RequestStatus.Processing); } + [Fact] + public async Task ProcessNextRequestAsync_ShouldReturnNotFound_WhenAllClaimsFail() + { + // Arrange -- simulates concurrent claim: another worker claimed the item first + var userId = Guid.NewGuid(); + var request = new LlmRequest(userId, "voicenote", "payload text"); + + _llmQueueRepoMock.Setup(r => r.GetByStatusAsync(RequestStatus.Pending, default)) + .ReturnsAsync(new[] { request }); + _llmQueueRepoMock.Setup(r => r.TryClaimProcessingAsync(request.Id, request.UpdatedAt, default)) + .ReturnsAsync(false); + + // Act + var result = await _service.ProcessNextRequestAsync(); + + // Assert -- all candidates failed to claim, so NotFound + result.IsSuccess.Should().BeFalse(); + result.ErrorCode.Should().Be(ErrorCodes.NotFound); + } + + [Fact] + public async Task ProcessNextRequestAsync_ShouldTryNextCandidate_WhenFirstClaimFails() + { + // Arrange -- first candidate already claimed, second succeeds + var userId = Guid.NewGuid(); + var first = new LlmRequest(userId, "summarize", "first payload"); + var second = new LlmRequest(userId, "summarize", "second payload"); + var baseTime = DateTimeOffset.UtcNow; + SetCreatedAt(first, baseTime); + SetCreatedAt(second, baseTime.AddMilliseconds(1)); + + _llmQueueRepoMock.Setup(r => r.GetByStatusAsync(RequestStatus.Pending, default)) + .ReturnsAsync(new[] { first, second }); + _llmQueueRepoMock.Setup(r => r.TryClaimProcessingAsync(first.Id, first.UpdatedAt, default)) + .ReturnsAsync(false); + _llmQueueRepoMock.Setup(r => r.TryClaimProcessingAsync(second.Id, second.UpdatedAt, default)) + .ReturnsAsync(true); + + var claimedSecond = new LlmRequest(userId, "summarize", "second payload"); + SetId(claimedSecond, second.Id); + SetCreatedAt(claimedSecond, baseTime.AddMilliseconds(1)); + claimedSecond.MarkAsProcessing(); + _llmQueueRepoMock.Setup(r => r.GetByIdAsync(second.Id, default)) + .ReturnsAsync(claimedSecond); + + // Act + var result = await _service.ProcessNextRequestAsync(); + + // Assert + result.IsSuccess.Should().BeTrue(); + result.Value.Id.Should().Be(second.Id); + result.Value.Status.Should().Be(RequestStatus.Processing); + } + #endregion #region GetQueueStatsAsync Tests @@ -477,8 +549,21 @@ public async Task GetQueueStatsAsync_ShouldReturnCorrectCounts() CreatedAtProperty.GetSetMethod(true) ?? throw new InvalidOperationException("Expected Entity.CreatedAt setter to exist."); + private static readonly PropertyInfo IdProperty = + typeof(Entity).GetProperty(nameof(Entity.Id)) + ?? throw new InvalidOperationException("Expected Entity.Id property to exist."); + + private static readonly MethodInfo IdSetter = + IdProperty.GetSetMethod(true) + ?? throw new InvalidOperationException("Expected Entity.Id setter to exist."); + private static void SetCreatedAt(LlmRequest request, DateTimeOffset createdAt) { CreatedAtSetter.Invoke(request, new object[] { createdAt }); } + + private static void SetId(LlmRequest request, Guid id) + { + IdSetter.Invoke(request, new object[] { id }); + } }