From c88a02b86d6184fd39e5fa950df2e5af93b4ee71 Mon Sep 17 00:00:00 2001 From: Ishaan Rajiv Date: Sat, 7 Mar 2026 19:37:02 +0530 Subject: [PATCH] Avoid duplicate assistant text on item.completed fallback - track whether assistant text was already emitted per message - finalize with buffered text first; ignore completion detail if already streamed - add regression test for streaming + item.completed detail dedupe --- .../Layers/ProviderRuntimeIngestion.test.ts | 90 +++++++++++++++++++ .../Layers/ProviderRuntimeIngestion.ts | 50 +++++++++-- 2 files changed, 133 insertions(+), 7 deletions(-) diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 96242b846c..b97c133083 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -850,6 +850,96 @@ describe("ProviderRuntimeIngestion", () => { expect(finalMessage?.streaming).toBe(false); }); + it("does not re-append streamed assistant text from item.completed detail", async () => { + const harness = await createHarness(); + const now = new Date().toISOString(); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.turn.start", + commandId: CommandId.makeUnsafe("cmd-turn-start-streaming-detail"), + threadId: ThreadId.makeUnsafe("thread-1"), + message: { + messageId: asMessageId("message-streaming-detail"), + role: "user", + text: "stream with detail", + attachments: [], + }, + assistantDeliveryMode: "streaming", + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "approval-required", + createdAt: now, + }), + ); + await Effect.runPromise(Effect.sleep("30 millis")); + + harness.emit({ + type: "turn.started", + eventId: asEventId("evt-turn-started-streaming-detail"), + provider: "codex", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-streaming-detail"), + }); + await waitForThread( + harness.engine, + (thread) => + thread.session?.status === "running" && + thread.session?.activeTurnId === "turn-streaming-detail", + ); + + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-message-delta-streaming-detail"), + provider: "codex", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-streaming-detail"), + itemId: asItemId("item-streaming-detail"), + payload: { + streamKind: "assistant_text", + delta: "hello live", + }, + }); + await waitForThread( + harness.engine, + (thread) => + thread.messages.some( + (message: ProviderRuntimeTestMessage) => + message.id === "assistant:item-streaming-detail" && + message.streaming && + message.text === "hello live", + ), + ); + + harness.emit({ + type: "item.completed", + eventId: asEventId("evt-message-completed-streaming-detail"), + provider: "codex", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-streaming-detail"), + itemId: asItemId("item-streaming-detail"), + payload: { + itemType: "assistant_message", + status: "completed", + detail: "hello live", + }, + }); + + const finalThread = await waitForThread(harness.engine, (thread) => + thread.messages.some( + (message: ProviderRuntimeTestMessage) => + message.id === "assistant:item-streaming-detail" && !message.streaming, + ), + ); + const finalMessage = finalThread.messages.find( + (message: ProviderRuntimeTestMessage) => message.id === "assistant:item-streaming-detail", + ); + expect(finalMessage?.text).toBe("hello live"); + expect(finalMessage?.streaming).toBe(false); + }); + it("spills oversized buffered deltas and still finalizes full assistant text", async () => { const harness = await createHarness(); const now = new Date().toISOString(); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 022a196674..f0c19e405d 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -30,6 +30,8 @@ const TURN_MESSAGE_IDS_BY_TURN_CACHE_CAPACITY = 10_000; const TURN_MESSAGE_IDS_BY_TURN_TTL = Duration.minutes(120); const BUFFERED_MESSAGE_TEXT_BY_MESSAGE_ID_CACHE_CAPACITY = 20_000; const BUFFERED_MESSAGE_TEXT_BY_MESSAGE_ID_TTL = Duration.minutes(120); +const EMITTED_ASSISTANT_TEXT_BY_MESSAGE_ID_CACHE_CAPACITY = 20_000; +const EMITTED_ASSISTANT_TEXT_BY_MESSAGE_ID_TTL = Duration.minutes(120); const BUFFERED_PROPOSED_PLAN_BY_ID_CACHE_CAPACITY = 10_000; const BUFFERED_PROPOSED_PLAN_BY_ID_TTL = Duration.minutes(120); const MAX_BUFFERED_ASSISTANT_CHARS = 24_000; @@ -77,6 +79,20 @@ function normalizeProposedPlanMarkdown(planMarkdown: string | undefined): string return trimmed; } +function resolveAssistantCompletionText(input: { + bufferedText: string; + fallbackText: string | undefined; + hasEmittedText: boolean; +}): string { + if (input.bufferedText.length > 0) { + return input.bufferedText; + } + if (input.hasEmittedText) { + return ""; + } + return input.fallbackText?.trim().length ? input.fallbackText : ""; +} + function proposedPlanIdForTurn(threadId: ThreadId, turnId: TurnId): string { return `plan:${threadId}:turn:${turnId}`; } @@ -502,6 +518,11 @@ const make = Effect.gen(function* () { timeToLive: BUFFERED_MESSAGE_TEXT_BY_MESSAGE_ID_TTL, lookup: () => Effect.succeed(""), }); + const emittedAssistantTextByMessageId = yield* Cache.make({ + capacity: EMITTED_ASSISTANT_TEXT_BY_MESSAGE_ID_CACHE_CAPACITY, + timeToLive: EMITTED_ASSISTANT_TEXT_BY_MESSAGE_ID_TTL, + lookup: () => Effect.succeed(false), + }); const bufferedProposedPlanById = yield* Cache.make({ capacity: BUFFERED_PROPOSED_PLAN_BY_ID_CACHE_CAPACITY, @@ -610,6 +631,14 @@ const make = Effect.gen(function* () { const clearBufferedAssistantText = (messageId: MessageId) => Cache.invalidate(bufferedAssistantTextByMessageId, messageId); + const markAssistantTextAsEmitted = (messageId: MessageId) => + Cache.set(emittedAssistantTextByMessageId, messageId, true); + + const hasAssistantTextBeenEmitted = (messageId: MessageId) => + Cache.getOption(emittedAssistantTextByMessageId, messageId).pipe( + Effect.map((existingValue) => Option.getOrElse(existingValue, () => false)), + ); + const appendBufferedProposedPlan = (planId: string, delta: string, createdAt: string) => Cache.getOption(bufferedProposedPlanById, planId).pipe( Effect.flatMap((existingEntry) => { @@ -633,7 +662,11 @@ const make = Effect.gen(function* () { const clearBufferedProposedPlan = (planId: string) => Cache.invalidate(bufferedProposedPlanById, planId); - const clearAssistantMessageState = (messageId: MessageId) => clearBufferedAssistantText(messageId); + const clearAssistantMessageState = (messageId: MessageId) => + Effect.all([ + clearBufferedAssistantText(messageId), + Cache.invalidate(emittedAssistantTextByMessageId, messageId), + ]).pipe(Effect.asVoid); const finalizeAssistantMessage = (input: { event: ProviderRuntimeEvent; @@ -646,13 +679,13 @@ const make = Effect.gen(function* () { fallbackText?: string; }) => Effect.gen(function* () { + const hasEmittedText = yield* hasAssistantTextBeenEmitted(input.messageId); const bufferedText = yield* takeBufferedAssistantText(input.messageId); - const text = - bufferedText.length > 0 - ? bufferedText - : (input.fallbackText?.trim().length ?? 0) > 0 - ? input.fallbackText! - : ""; + const text = resolveAssistantCompletionText({ + bufferedText, + fallbackText: input.fallbackText, + hasEmittedText, + }); if (text.length > 0) { yield* orchestrationEngine.dispatch({ @@ -664,6 +697,7 @@ const make = Effect.gen(function* () { ...(input.turnId ? { turnId: input.turnId } : {}), createdAt: input.createdAt, }); + yield* markAssistantTextAsEmitted(input.messageId); } yield* orchestrationEngine.dispatch({ @@ -913,6 +947,7 @@ const make = Effect.gen(function* () { ...(turnId ? { turnId } : {}), createdAt: now, }); + yield* markAssistantTextAsEmitted(assistantMessageId); } } else { yield* orchestrationEngine.dispatch({ @@ -924,6 +959,7 @@ const make = Effect.gen(function* () { ...(turnId ? { turnId } : {}), createdAt: now, }); + yield* markAssistantTextAsEmitted(assistantMessageId); } }