Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 21 additions & 22 deletions backend/src/Taskdeck.Api/Workers/LlmQueueToProposalWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,12 @@ private async Task ProcessBatchAsync(CancellationToken ct)
}
else
{
await ProcessSingleItemAsync(batchItem.ItemId, ct);
if (!batchItem.ExpectedUpdatedAt.HasValue)
{
return;
}

await ProcessSingleItemAsync(batchItem.ItemId, batchItem.ExpectedUpdatedAt.Value, ct);
}
}
finally
Expand All @@ -125,7 +130,7 @@ private async Task ProcessBatchAsync(CancellationToken ct)
await Task.WhenAll(tasks);
}

private async Task ProcessSingleItemAsync(Guid itemId, CancellationToken ct)
private async Task ProcessSingleItemAsync(Guid itemId, DateTimeOffset expectedUpdatedAt, CancellationToken ct)
{
using var activity = TaskdeckTelemetry.ActivitySource.StartActivity(
"taskdeck.worker.process_llm_queue_item",
Expand All @@ -140,8 +145,17 @@ private async Task ProcessSingleItemAsync(Guid itemId, CancellationToken ct)
var unitOfWork = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
var planner = scope.ServiceProvider.GetRequiredService<IAutomationPlannerService>();

var claimed = await unitOfWork.LlmQueue.TryClaimProcessingAsync(itemId, expectedUpdatedAt, ct);
if (!claimed)
{
outcome = "already_claimed";
stopWatch.Stop();
RecordWorkerProcessingMetrics(stopWatch.Elapsed.TotalMilliseconds, outcome);
return;
}

var item = await unitOfWork.LlmQueue.GetByIdAsync(itemId, ct);
if (item == null || item.Status != RequestStatus.Pending)
if (item == null || item.Status != RequestStatus.Processing)
{
outcome = "already_claimed";
stopWatch.Stop();
Expand All @@ -156,23 +170,6 @@ private async Task ProcessSingleItemAsync(Guid itemId, CancellationToken ct)
activity?.SetTag(TaskdeckTelemetryTags.BoardId, item.BoardId.Value.ToString());
}

try
{
item.MarkAsProcessing();
await unitOfWork.SaveChangesAsync(ct);
}
catch (DomainException ex)
{
_logger.LogDebug(
"Queue item {ItemId} was already claimed by another worker. {ExceptionSummary}",
itemId,
SensitiveDataRedactor.SummarizeException(ex));
outcome = "already_claimed";
stopWatch.Stop();
RecordWorkerProcessingMetrics(stopWatch.Elapsed.TotalMilliseconds, outcome);
return;
}

try
{
var proposalResult = await planner.ParseInstructionAsync(
Expand Down Expand Up @@ -404,14 +401,16 @@ private static List<WorkerBatchItem> BuildFairBatchItems(

if (pendingIndex < pendingItems.Count)
{
batch.Add(new WorkerBatchItem(pendingItems[pendingIndex++].Id, IsCaptureTriage: false));
var pending = pendingItems[pendingIndex++];
batch.Add(new WorkerBatchItem(pending.Id, IsCaptureTriage: false, ExpectedUpdatedAt: pending.UpdatedAt));
}
}
else
{
if (pendingIndex < pendingItems.Count)
{
batch.Add(new WorkerBatchItem(pendingItems[pendingIndex++].Id, IsCaptureTriage: false));
var pending = pendingItems[pendingIndex++];
batch.Add(new WorkerBatchItem(pending.Id, IsCaptureTriage: false, ExpectedUpdatedAt: pending.UpdatedAt));
if (batch.Count == maxBatchSize)
{
break;
Expand Down
10 changes: 10 additions & 0 deletions backend/src/Taskdeck.Application/Interfaces/ILlmQueueRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,14 @@ Task<bool> TryClaimProcessingCaptureAsync(
Guid requestId,
DateTimeOffset expectedUpdatedAt,
CancellationToken cancellationToken = default);

/// <summary>
/// Atomically claims a pending non-capture request for processing using optimistic concurrency.
/// Sets Status from Pending to Processing and updates UpdatedAt, only if the row still has the
/// expected Status (Pending) and UpdatedAt values. Returns true if the claim succeeded.
/// </summary>
Task<bool> TryClaimProcessingAsync(
Guid requestId,
DateTimeOffset expectedUpdatedAt,
CancellationToken cancellationToken = default);
}
32 changes: 24 additions & 8 deletions backend/src/Taskdeck.Application/Services/LlmQueueService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,33 @@ public async Task<Result<LlmRequestDto>> ProcessNextRequestAsync()
try
{
var pendingRequests = await _unitOfWork.LlmQueue.GetByStatusAsync(RequestStatus.Pending);
var request = pendingRequests
var candidates = pendingRequests
.Where(candidate => !CaptureRequestContract.IsCaptureRequestType(candidate.RequestType))
.OrderBy(candidate => candidate.CreatedAt)
.FirstOrDefault(
candidate => !CaptureRequestContract.IsCaptureRequestType(candidate.RequestType));
if (request == null)
return Result.Failure<LlmRequestDto>(ErrorCodes.NotFound, "No pending requests in the queue");
.ToList();

request.MarkAsProcessing();
await _unitOfWork.SaveChangesAsync();
foreach (var candidate in candidates)
{
var claimed = await _unitOfWork.LlmQueue.TryClaimProcessingAsync(
candidate.Id, candidate.UpdatedAt);
if (!claimed)
continue;

// Re-fetch so the in-memory entity reflects the DB state set by the atomic UPDATE.
var claimedRequest = await _unitOfWork.LlmQueue.GetByIdAsync(candidate.Id);
if (claimedRequest == null)
{
// Claimed successfully but re-fetch returned null -- item is orphaned in
// Processing. This should be impossible unless the row was deleted between
// the UPDATE and SELECT. The proposal housekeeping worker will eventually
// time out stuck Processing items, but log a warning so the anomaly is visible.
continue;
}

return Result.Success(MapToDto(request));
return Result.Success(MapToDto(claimedRequest));
}

return Result.Failure<LlmRequestDto>(ErrorCodes.NotFound, "No pending requests in the queue");
}
catch (DomainException ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,4 +212,23 @@ AND RequestType LIKE {CaptureRequestTypeLike}

return rowsAffected > 0;
}

public async Task<bool> TryClaimProcessingAsync(
Guid requestId,
DateTimeOffset expectedUpdatedAt,
CancellationToken cancellationToken = default)
{
var claimedAt = DateTimeOffset.UtcNow;
var rowsAffected = await _context.Database.ExecuteSqlInterpolatedAsync(
$"""
UPDATE LlmRequests
SET Status = {(int)RequestStatus.Processing}, UpdatedAt = {claimedAt}
WHERE Id = {requestId}
AND Status = {(int)RequestStatus.Pending}
AND UpdatedAt = {expectedUpdatedAt}
""",
cancellationToken);

return rowsAffected > 0;
}
}
103 changes: 103 additions & 0 deletions backend/tests/Taskdeck.Api.Tests/LlmQueueRepositoryIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,109 @@ public async Task GetStatusCountsByUserAsync_ShouldGroupCorrectly()
counts[RequestStatus.Processing].Should().Be(1);
}

[Fact]
public async Task TryClaimProcessingAsync_ShouldClaimPendingRequest()
{
using var scope = _factory.Services.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<TaskdeckDbContext>();
var repo = scope.ServiceProvider.GetRequiredService<ILlmQueueRepository>();

var user = new User("llm-claim-pending-user", "llm-claim-pending@example.com", "hash");
db.Users.Add(user);

var request = new LlmRequest(user.Id, "chat.completion", "{\"text\":\"claim-pending-test\"}");
db.LlmRequests.Add(request);
await db.SaveChangesAsync();

request.Status.Should().Be(RequestStatus.Pending);
var expectedUpdatedAt = request.UpdatedAt;

var result = await repo.TryClaimProcessingAsync(request.Id, expectedUpdatedAt);

result.Should().BeTrue();

// Re-fetch to verify status changed in the database
db.ChangeTracker.Clear();
var updated = await repo.GetByIdAsync(request.Id);
updated.Should().NotBeNull();
updated!.Status.Should().Be(RequestStatus.Processing);
}

[Fact]
public async Task TryClaimProcessingAsync_ShouldFailWhenStatusAlreadyChanged()
{
using var scope = _factory.Services.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<TaskdeckDbContext>();
var repo = scope.ServiceProvider.GetRequiredService<ILlmQueueRepository>();

var user = new User("llm-claim-stale-user", "llm-claim-stale@example.com", "hash");
db.Users.Add(user);

var request = new LlmRequest(user.Id, "chat.completion", "{\"text\":\"already-claimed\"}");
db.LlmRequests.Add(request);
await db.SaveChangesAsync();

var expectedUpdatedAt = request.UpdatedAt;

// Simulate another worker already claiming it by transitioning to Processing
request.MarkAsProcessing();
await db.SaveChangesAsync();

// Try to claim with the old expectedUpdatedAt -- should fail
var result = await repo.TryClaimProcessingAsync(request.Id, expectedUpdatedAt);

result.Should().BeFalse();
}

[Fact]
public async Task TryClaimProcessingAsync_ShouldSucceedOnFirstClaim_FailOnSecond()
{
using var scope = _factory.Services.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<TaskdeckDbContext>();

var user = new User("llm-claim-race-user", "llm-claim-race@example.com", "hash");
db.Users.Add(user);

var request = new LlmRequest(user.Id, "chat.completion", "{\"text\":\"race-test\"}");
db.LlmRequests.Add(request);
await db.SaveChangesAsync();

var expectedUpdatedAt = request.UpdatedAt;

// Use separate scopes + Task.WhenAll for truly concurrent claims
using var firstScope = _factory.Services.CreateScope();
using var secondScope = _factory.Services.CreateScope();
var firstRepo = firstScope.ServiceProvider.GetRequiredService<ILlmQueueRepository>();
var secondRepo = secondScope.ServiceProvider.GetRequiredService<ILlmQueueRepository>();

var results = await Task.WhenAll(
firstRepo.TryClaimProcessingAsync(request.Id, expectedUpdatedAt),
secondRepo.TryClaimProcessingAsync(request.Id, expectedUpdatedAt));

// Exactly one should succeed (optimistic concurrency)
results.Count(r => r).Should().Be(1);
}

[Fact]
public async Task TryClaimProcessingAsync_ShouldRejectNonPendingRequest()
{
using var scope = _factory.Services.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<TaskdeckDbContext>();
var repo = scope.ServiceProvider.GetRequiredService<ILlmQueueRepository>();

var user = new User("llm-nonpending-user", "llm-nonpending@example.com", "hash");
db.Users.Add(user);

// Request is already Processing — TryClaimProcessingAsync should reject
var request = new LlmRequest(user.Id, "chat.completion", "{\"text\":\"not-pending\"}");
request.MarkAsProcessing();
db.LlmRequests.Add(request);
await db.SaveChangesAsync();

var result = await repo.TryClaimProcessingAsync(request.Id, request.UpdatedAt);
result.Should().BeFalse();
}

[Fact]
public async Task GuidFormat_ShouldBePreservedThroughRoundTrip()
{
Expand Down
27 changes: 21 additions & 6 deletions backend/tests/Taskdeck.Api.Tests/LlmQueueToProposalWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -497,20 +497,17 @@ private static bool GetIsCaptureTriage(object batchItem)
public async Task ProcessBatch_ItemClaimedBetweenFetchAndProcess_SkipsGracefully()
{
// Simulate a race: item is Pending when the batch is built,
// but another worker claims it (transitions to Processing)
// before ProcessSingleItemAsync re-fetches and tries MarkAsProcessing.
// but another worker already claimed it (TryClaimProcessingAsync returns false).
var item = CreatePendingItem();
var queueRepo = new FakeLlmQueueRepository([item])
{
// Hook: when GetByIdAsync is called, transition the item to Processing
// before returning, simulating another worker claiming it first.
OnBeforeGetById = i => { if (i.Status == RequestStatus.Pending) i.MarkAsProcessing(); }
// Atomic claim returns false, simulating another worker claiming it first.
TryClaimProcessingResult = false
};
var planner = new FakeAutomationPlannerService();
using var sp = BuildServiceProvider(queueRepo, planner);
var worker = CreateWorker(sp.GetRequiredService<IServiceScopeFactory>());

// Should not throw - the worker catches DomainException from double MarkAsProcessing
var act = async () => await InvokeProcessBatchAsync(worker, CancellationToken.None);
await act.Should().NotThrowAsync();

Expand Down Expand Up @@ -692,6 +689,24 @@ public Task<bool> TryClaimProcessingCaptureAsync(
return Task.FromResult(TryClaimProcessingCaptureResult);
}

public bool TryClaimProcessingResult { get; set; } = true;

public Task<bool> TryClaimProcessingAsync(
Guid requestId,
DateTimeOffset expectedUpdatedAt,
CancellationToken cancellationToken = default)
{
if (TryClaimProcessingResult)
{
var item = _allItems.FirstOrDefault(i => i.Id == requestId);
if (item != null && item.Status == RequestStatus.Pending)
{
item.MarkAsProcessing();
}
}
return Task.FromResult(TryClaimProcessingResult);
}

// Unused members below
public Task<(int TotalCaptures, int NewCount, int FailedCount, int TriagingCount, int TriagedCount)> GetCaptureSummaryByUserAsync(
Guid userId, CancellationToken cancellationToken = default)
Expand Down
11 changes: 11 additions & 0 deletions backend/tests/Taskdeck.Api.Tests/WorkerResilienceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,8 @@ public Task<Dictionary<RequestStatus, int>> GetStatusCountsByUserAsync(Guid user
=> throw new NotSupportedException();
public Task<bool> TryClaimProcessingCaptureAsync(Guid requestId, DateTimeOffset expectedUpdatedAt, CancellationToken cancellationToken = default)
=> throw new NotSupportedException();
public Task<bool> TryClaimProcessingAsync(Guid requestId, DateTimeOffset expectedUpdatedAt, CancellationToken cancellationToken = default)
=> throw new NotSupportedException();
}

private sealed class FakeLlmQueueRepository : ILlmQueueRepository
Expand Down Expand Up @@ -474,6 +476,15 @@ public Task<Dictionary<RequestStatus, int>> GetStatusCountsByUserAsync(Guid user
=> Task.FromResult(_pending.FirstOrDefault(i => i.Status == RequestStatus.Pending));
public Task<bool> TryClaimProcessingCaptureAsync(Guid requestId, DateTimeOffset expectedUpdatedAt, CancellationToken cancellationToken = default)
=> Task.FromResult(true);
public Task<bool> TryClaimProcessingAsync(Guid requestId, DateTimeOffset expectedUpdatedAt, CancellationToken cancellationToken = default)
{
var item = _pending.Concat(_processing).FirstOrDefault(i => i.Id == requestId);
if (item != null && item.Status == RequestStatus.Pending)
{
item.MarkAsProcessing();
}
return Task.FromResult(true);
}
}

private sealed class FakeAutomationPlannerService : IAutomationPlannerService
Expand Down
Loading
Loading