Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,96 @@ describe("ProviderRuntimeIngestion", () => {
expect(finalMessage?.streaming).toBe(false);
});

it("does not re-append streamed assistant text from item.completed detail", async () => {
const harness = await createHarness();
const now = new Date().toISOString();

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.turn.start",
commandId: CommandId.makeUnsafe("cmd-turn-start-streaming-detail"),
threadId: ThreadId.makeUnsafe("thread-1"),
message: {
messageId: asMessageId("message-streaming-detail"),
role: "user",
text: "stream with detail",
attachments: [],
},
assistantDeliveryMode: "streaming",
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
createdAt: now,
}),
);
await Effect.runPromise(Effect.sleep("30 millis"));

harness.emit({
type: "turn.started",
eventId: asEventId("evt-turn-started-streaming-detail"),
provider: "codex",
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-streaming-detail"),
});
await waitForThread(
harness.engine,
(thread) =>
thread.session?.status === "running" &&
thread.session?.activeTurnId === "turn-streaming-detail",
);

harness.emit({
type: "content.delta",
eventId: asEventId("evt-message-delta-streaming-detail"),
provider: "codex",
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-streaming-detail"),
itemId: asItemId("item-streaming-detail"),
payload: {
streamKind: "assistant_text",
delta: "hello live",
},
});
await waitForThread(
harness.engine,
(thread) =>
thread.messages.some(
(message: ProviderRuntimeTestMessage) =>
message.id === "assistant:item-streaming-detail" &&
message.streaming &&
message.text === "hello live",
),
);

harness.emit({
type: "item.completed",
eventId: asEventId("evt-message-completed-streaming-detail"),
provider: "codex",
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-streaming-detail"),
itemId: asItemId("item-streaming-detail"),
payload: {
itemType: "assistant_message",
status: "completed",
detail: "hello live",
},
});

const finalThread = await waitForThread(harness.engine, (thread) =>
thread.messages.some(
(message: ProviderRuntimeTestMessage) =>
message.id === "assistant:item-streaming-detail" && !message.streaming,
),
);
const finalMessage = finalThread.messages.find(
(message: ProviderRuntimeTestMessage) => message.id === "assistant:item-streaming-detail",
);
expect(finalMessage?.text).toBe("hello live");
expect(finalMessage?.streaming).toBe(false);
});

it("spills oversized buffered deltas and still finalizes full assistant text", async () => {
const harness = await createHarness();
const now = new Date().toISOString();
Expand Down
50 changes: 43 additions & 7 deletions apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const TURN_MESSAGE_IDS_BY_TURN_CACHE_CAPACITY = 10_000;
const TURN_MESSAGE_IDS_BY_TURN_TTL = Duration.minutes(120);
const BUFFERED_MESSAGE_TEXT_BY_MESSAGE_ID_CACHE_CAPACITY = 20_000;
const BUFFERED_MESSAGE_TEXT_BY_MESSAGE_ID_TTL = Duration.minutes(120);
const EMITTED_ASSISTANT_TEXT_BY_MESSAGE_ID_CACHE_CAPACITY = 20_000;
const EMITTED_ASSISTANT_TEXT_BY_MESSAGE_ID_TTL = Duration.minutes(120);
const BUFFERED_PROPOSED_PLAN_BY_ID_CACHE_CAPACITY = 10_000;
const BUFFERED_PROPOSED_PLAN_BY_ID_TTL = Duration.minutes(120);
const MAX_BUFFERED_ASSISTANT_CHARS = 24_000;
Expand Down Expand Up @@ -77,6 +79,20 @@ function normalizeProposedPlanMarkdown(planMarkdown: string | undefined): string
return trimmed;
}

function resolveAssistantCompletionText(input: {
bufferedText: string;
fallbackText: string | undefined;
hasEmittedText: boolean;
}): string {
if (input.bufferedText.length > 0) {
return input.bufferedText;
}
if (input.hasEmittedText) {
return "";
}
return input.fallbackText?.trim().length ? input.fallbackText : "";
}

function proposedPlanIdForTurn(threadId: ThreadId, turnId: TurnId): string {
return `plan:${threadId}:turn:${turnId}`;
}
Expand Down Expand Up @@ -502,6 +518,11 @@ const make = Effect.gen(function* () {
timeToLive: BUFFERED_MESSAGE_TEXT_BY_MESSAGE_ID_TTL,
lookup: () => Effect.succeed(""),
});
const emittedAssistantTextByMessageId = yield* Cache.make<MessageId, boolean>({
capacity: EMITTED_ASSISTANT_TEXT_BY_MESSAGE_ID_CACHE_CAPACITY,
timeToLive: EMITTED_ASSISTANT_TEXT_BY_MESSAGE_ID_TTL,
lookup: () => Effect.succeed(false),
});

const bufferedProposedPlanById = yield* Cache.make<string, { text: string; createdAt: string }>({
capacity: BUFFERED_PROPOSED_PLAN_BY_ID_CACHE_CAPACITY,
Expand Down Expand Up @@ -610,6 +631,14 @@ const make = Effect.gen(function* () {
const clearBufferedAssistantText = (messageId: MessageId) =>
Cache.invalidate(bufferedAssistantTextByMessageId, messageId);

const markAssistantTextAsEmitted = (messageId: MessageId) =>
Cache.set(emittedAssistantTextByMessageId, messageId, true);

const hasAssistantTextBeenEmitted = (messageId: MessageId) =>
Cache.getOption(emittedAssistantTextByMessageId, messageId).pipe(
Effect.map((existingValue) => Option.getOrElse(existingValue, () => false)),
);

const appendBufferedProposedPlan = (planId: string, delta: string, createdAt: string) =>
Cache.getOption(bufferedProposedPlanById, planId).pipe(
Effect.flatMap((existingEntry) => {
Expand All @@ -633,7 +662,11 @@ const make = Effect.gen(function* () {
const clearBufferedProposedPlan = (planId: string) =>
Cache.invalidate(bufferedProposedPlanById, planId);

const clearAssistantMessageState = (messageId: MessageId) => clearBufferedAssistantText(messageId);
const clearAssistantMessageState = (messageId: MessageId) =>
Effect.all([
clearBufferedAssistantText(messageId),
Cache.invalidate(emittedAssistantTextByMessageId, messageId),
]).pipe(Effect.asVoid);

const finalizeAssistantMessage = (input: {
event: ProviderRuntimeEvent;
Expand All @@ -646,13 +679,13 @@ const make = Effect.gen(function* () {
fallbackText?: string;
}) =>
Effect.gen(function* () {
const hasEmittedText = yield* hasAssistantTextBeenEmitted(input.messageId);
const bufferedText = yield* takeBufferedAssistantText(input.messageId);
const text =
bufferedText.length > 0
? bufferedText
: (input.fallbackText?.trim().length ?? 0) > 0
? input.fallbackText!
: "";
const text = resolveAssistantCompletionText({
bufferedText,
fallbackText: input.fallbackText,
hasEmittedText,
});

if (text.length > 0) {
yield* orchestrationEngine.dispatch({
Expand All @@ -664,6 +697,7 @@ const make = Effect.gen(function* () {
...(input.turnId ? { turnId: input.turnId } : {}),
createdAt: input.createdAt,
});
yield* markAssistantTextAsEmitted(input.messageId);
}

yield* orchestrationEngine.dispatch({
Expand Down Expand Up @@ -913,6 +947,7 @@ const make = Effect.gen(function* () {
...(turnId ? { turnId } : {}),
createdAt: now,
});
yield* markAssistantTextAsEmitted(assistantMessageId);
}
} else {
yield* orchestrationEngine.dispatch({
Expand All @@ -924,6 +959,7 @@ const make = Effect.gen(function* () {
...(turnId ? { turnId } : {}),
createdAt: now,
});
yield* markAssistantTextAsEmitted(assistantMessageId);
}
}

Expand Down