diff --git a/packages/typescript/ai-anthropic/src/adapters/text.ts b/packages/typescript/ai-anthropic/src/adapters/text.ts index 235d9f5b5..c64597843 100644 --- a/packages/typescript/ai-anthropic/src/adapters/text.ts +++ b/packages/typescript/ai-anthropic/src/adapters/text.ts @@ -289,6 +289,9 @@ export class AnthropicTextAdapter< system: options.systemPrompts?.join('\n'), tools: tools, ...validProviderOptions, + ...(thinkingBudget && { + betas: ['interleaved-thinking-2025-05-14'] as any, + }), } validateTextProviderOptions(requestParams) return requestParams @@ -389,6 +392,18 @@ export class AnthropicTextAdapter< if (role === 'assistant' && message.toolCalls?.length) { const contentBlocks: AnthropicContentBlocks = [] + if (message.thinking?.length) { + for (const thinking of message.thinking) { + if (thinking.signature) { + contentBlocks.push({ + type: 'thinking', + thinking: thinking.content, + signature: thinking.signature, + } as unknown as AnthropicContentBlock) + } + } + } + if (message.content) { const content = typeof message.content === 'string' ? message.content : '' @@ -528,6 +543,7 @@ export class AnthropicTextAdapter< ): AsyncIterable { let accumulatedContent = '' let accumulatedThinking = '' + let accumulatedSignature = '' const timestamp = Date.now() const toolCallsMap = new Map< number, @@ -570,6 +586,7 @@ export class AnthropicTextAdapter< }) } else if (event.content_block.type === 'thinking') { accumulatedThinking = '' + accumulatedSignature = '' // Emit STEP_STARTED for thinking stepId = genId() yield { @@ -615,6 +632,11 @@ export class AnthropicTextAdapter< delta, content: accumulatedThinking, } + } else if ( + (event.delta as { type: string }).type === 'signature_delta' + ) { + accumulatedSignature += + (event.delta as { signature: string }).signature || '' } else if (event.delta.type === 'input_json_delta') { const existing = toolCallsMap.get(currentToolIndex) if (existing) { @@ -644,7 +666,20 @@ export class AnthropicTextAdapter< } } } else if (event.type === 'content_block_stop') { - if (currentBlockType === 'tool_use') { + if (currentBlockType === 'thinking') { + // Emit signature so it can be replayed in multi-turn context + if (accumulatedSignature && stepId) { + yield { + type: 'STEP_FINISHED', + stepId, + model, + timestamp, + delta: '', + content: accumulatedThinking, + signature: accumulatedSignature, + } + } + } else if (currentBlockType === 'tool_use') { const existing = toolCallsMap.get(currentToolIndex) if (existing) { // If tool call wasn't started yet (no args), start it now diff --git a/packages/typescript/ai-client/src/chat-client.ts b/packages/typescript/ai-client/src/chat-client.ts index 4b0e40f70..306770cdd 100644 --- a/packages/typescript/ai-client/src/chat-client.ts +++ b/packages/typescript/ai-client/src/chat-client.ts @@ -153,7 +153,11 @@ export class ChatClient { this.events.textUpdated(this.currentStreamId, messageId, content) } }, - onThinkingUpdate: (messageId: string, content: string) => { + onThinkingUpdate: ( + messageId: string, + _stepId: string, + content: string, + ) => { // Emit thinking update to devtools if (this.currentStreamId) { this.events.thinkingUpdated( diff --git a/packages/typescript/ai/src/activities/chat/index.ts b/packages/typescript/ai/src/activities/chat/index.ts index c7ec866d6..014ddd214 100644 --- a/packages/typescript/ai/src/activities/chat/index.ts +++ b/packages/typescript/ai/src/activities/chat/index.ts @@ -253,6 +253,10 @@ class TextEngine< private totalChunkCount = 0 private currentMessageId: string | null = null private accumulatedContent = '' + private accumulatedThinking: Array<{ content: string; signature?: string }> = + [] + private currentThinkingContent = '' + private currentThinkingSignature = '' private eventOptions?: Record private eventToolNames?: Array private finishedEvent: RunFinishedEvent | null = null @@ -494,6 +498,9 @@ class TextEngine< private async beginIteration(): Promise { this.currentMessageId = this.createId('msg') this.accumulatedContent = '' + this.accumulatedThinking = [] + this.currentThinkingContent = '' + this.currentThinkingSignature = '' this.finishedEvent = null // Update mutable context fields @@ -585,12 +592,15 @@ class TextEngine< case 'RUN_ERROR': this.handleRunErrorEvent(chunk) break + case 'STEP_STARTED': + this.handleStepStartedEvent() + break case 'STEP_FINISHED': this.handleStepFinishedEvent(chunk) break default: - // RUN_STARTED, TEXT_MESSAGE_START, TEXT_MESSAGE_END, STEP_STARTED, + // RUN_STARTED, TEXT_MESSAGE_START, TEXT_MESSAGE_END, // STATE_SNAPSHOT, STATE_DELTA, CUSTOM // - no special handling needed in chat activity break @@ -633,10 +643,32 @@ class TextEngine< this.earlyTermination = true } + private finalizeCurrentThinkingStep(): void { + if (this.currentThinkingContent) { + this.accumulatedThinking.push({ + content: this.currentThinkingContent, + ...(this.currentThinkingSignature && { + signature: this.currentThinkingSignature, + }), + }) + this.currentThinkingContent = '' + this.currentThinkingSignature = '' + } + } + + private handleStepStartedEvent(): void { + this.finalizeCurrentThinkingStep() + } + private handleStepFinishedEvent( - _chunk: Extract, + chunk: Extract, ): void { - // State tracking for STEP_FINISHED is handled by middleware + if (chunk.delta) { + this.currentThinkingContent += chunk.delta + } + if (chunk.signature) { + this.currentThinkingSignature = chunk.signature + } } private async *checkForPendingToolCalls(): AsyncGenerator< @@ -939,12 +971,17 @@ class TextEngine< } private addAssistantToolCallMessage(toolCalls: Array): void { + this.finalizeCurrentThinkingStep() + this.messages = [ ...this.messages, { role: 'assistant', content: this.accumulatedContent || null, toolCalls, + ...(this.accumulatedThinking.length > 0 && { + thinking: this.accumulatedThinking, + }), }, ] } diff --git a/packages/typescript/ai/src/activities/chat/messages.ts b/packages/typescript/ai/src/activities/chat/messages.ts index b7f97b880..cecf16920 100644 --- a/packages/typescript/ai/src/activities/chat/messages.ts +++ b/packages/typescript/ai/src/activities/chat/messages.ts @@ -165,6 +165,7 @@ function isToolCallIncluded(part: ToolCallPart): boolean { function buildAssistantMessages(uiMessage: UIMessage): Array { const messageList: Array = [] let current = createSegment() + let pendingThinking: Array<{ content: string; signature?: string }> = [] // Track emitted tool result IDs to avoid duplicates. // A tool call can have BOTH an explicit tool-result part AND an output @@ -181,7 +182,9 @@ function buildAssistantMessages(uiMessage: UIMessage): Array { role: 'assistant', content, ...(hasToolCalls && { toolCalls: current.toolCalls }), + ...(pendingThinking.length > 0 && { thinking: pendingThinking }), }) + pendingThinking = [] } current = createSegment() } @@ -227,7 +230,15 @@ function buildAssistantMessages(uiMessage: UIMessage): Array { } break - // thinking parts are skipped - they're UI-only + case 'thinking': + if (part.content) { + pendingThinking.push({ + content: part.content, + ...(part.signature && { signature: part.signature }), + }) + } + break + default: break } diff --git a/packages/typescript/ai/src/activities/chat/stream/message-updaters.ts b/packages/typescript/ai/src/activities/chat/stream/message-updaters.ts index 80b94d59a..09a565f0d 100644 --- a/packages/typescript/ai/src/activities/chat/stream/message-updaters.ts +++ b/packages/typescript/ai/src/activities/chat/stream/message-updaters.ts @@ -244,12 +244,15 @@ export function updateToolCallApprovalResponse( } /** - * Update or add a thinking part to a message. + * Update or add a thinking part to a message, keyed by stepId. + * Each distinct stepId produces its own ThinkingPart. */ export function updateThinkingPart( messages: Array, messageId: string, + stepId: string, content: string, + signature?: string, ): Array { return messages.map((msg) => { if (msg.id !== messageId) { @@ -257,15 +260,19 @@ export function updateThinkingPart( } const parts = [...msg.parts] - const thinkingPartIndex = parts.findIndex((p) => p.type === 'thinking') + const thinkingPartIndex = parts.findIndex( + (p) => p.type === 'thinking' && p.stepId === stepId, + ) const thinkingPart: ThinkingPart = { type: 'thinking', content, + stepId, + ...(signature && { signature }), } if (thinkingPartIndex >= 0) { - // Update existing thinking part + // Update existing thinking part for this step parts[thinkingPartIndex] = thinkingPart } else { // Add new thinking part at the end (preserve natural streaming order) diff --git a/packages/typescript/ai/src/activities/chat/stream/processor.ts b/packages/typescript/ai/src/activities/chat/stream/processor.ts index dc5330f17..998bda336 100644 --- a/packages/typescript/ai/src/activities/chat/stream/processor.ts +++ b/packages/typescript/ai/src/activities/chat/stream/processor.ts @@ -90,7 +90,11 @@ export interface StreamProcessorEvents { state: ToolCallState, args: string, ) => void - onThinkingUpdate?: (messageId: string, content: string) => void + onThinkingUpdate?: ( + messageId: string, + stepId: string, + content: string, + ) => void } /** @@ -139,6 +143,7 @@ export class StreamProcessor { private activeMessageIds: Set = new Set() private toolCallToMessage: Map = new Map() private pendingManualMessageId: string | null = null + private pendingThinkingStepId: string | null = null // Run tracking (for concurrent run safety) private activeRuns = new Set() @@ -495,8 +500,12 @@ export class StreamProcessor { this.handleRunStartedEvent(chunk) break + case 'STEP_STARTED': + this.handleStepStartedEvent(chunk) + break + default: - // STEP_STARTED, STATE_SNAPSHOT, STATE_DELTA - no special handling needed + // STATE_SNAPSHOT, STATE_DELTA - no special handling needed break } } @@ -518,7 +527,10 @@ export class StreamProcessor { totalTextContent: '', currentSegmentText: '', lastEmittedText: '', - thinkingContent: '', + thinkingSteps: new Map(), + thinkingStepSignatures: new Map(), + thinkingStepOrder: [], + currentThinkingStepId: null, toolCalls: new Map(), toolCallOrder: [], hasToolCallsSinceTextStart: false, @@ -1056,11 +1068,38 @@ export class StreamProcessor { this.events.onError?.(new Error(chunk.error.message || 'An error occurred')) } + /** + * Handle STEP_STARTED event (for thinking/reasoning content). + * + * Records the stepId so that subsequent STEP_FINISHED deltas accumulate + * into their own ThinkingPart. Does not create a message — the message + * is lazily created when the first STEP_FINISHED content arrives. + */ + private handleStepStartedEvent( + chunk: Extract, + ): void { + const activeId = this.getActiveAssistantMessageId() + if (activeId) { + const state = this.getMessageState(activeId) + if (state) { + state.currentThinkingStepId = chunk.stepId + if (!state.thinkingSteps.has(chunk.stepId)) { + state.thinkingSteps.set(chunk.stepId, '') + state.thinkingStepOrder.push(chunk.stepId) + } + return + } + } + + // No active message yet — defer until ensureAssistantMessage in STEP_FINISHED + this.pendingThinkingStepId = chunk.stepId + } + /** * Handle STEP_FINISHED event (for thinking/reasoning content). * - * Accumulates delta into thinkingContent and updates a single ThinkingPart - * in the UIMessage (replaced in-place, not appended). + * Accumulates delta into the current thinking step's content and updates + * the corresponding ThinkingPart in the UIMessage. * * @see docs/chat-architecture.md#thinkingreasoning-content — Thinking flow */ @@ -1071,7 +1110,26 @@ export class StreamProcessor { this.getActiveAssistantMessageId() ?? undefined, ) - const previous = state.thinkingContent + // Consume pending stepId from STEP_STARTED that arrived before the message existed + if (this.pendingThinkingStepId) { + state.currentThinkingStepId = this.pendingThinkingStepId + if (!state.thinkingSteps.has(this.pendingThinkingStepId)) { + state.thinkingSteps.set(this.pendingThinkingStepId, '') + state.thinkingStepOrder.push(this.pendingThinkingStepId) + } + this.pendingThinkingStepId = null + } + + const stepId = state.currentThinkingStepId ?? chunk.stepId + + // Auto-initialize if no prior STEP_STARTED (backward compat) + if (!state.thinkingSteps.has(stepId)) { + state.thinkingSteps.set(stepId, '') + state.thinkingStepOrder.push(stepId) + state.currentThinkingStepId = stepId + } + + const previous = state.thinkingSteps.get(stepId)! let nextThinking = previous // Prefer delta over content @@ -1087,18 +1145,24 @@ export class StreamProcessor { } } - state.thinkingContent = nextThinking + state.thinkingSteps.set(stepId, nextThinking) + + if (chunk.signature) { + state.thinkingStepSignatures.set(stepId, chunk.signature) + } // Update UIMessage this.messages = updateThinkingPart( this.messages, messageId, - state.thinkingContent, + stepId, + nextThinking, + state.thinkingStepSignatures.get(stepId), ) this.emitMessagesChange() // Emit granular event - this.events.onThinkingUpdate?.(messageId, state.thinkingContent) + this.events.onThinkingUpdate?.(messageId, stepId, nextThinking) } /** @@ -1384,7 +1448,9 @@ export class StreamProcessor { for (const state of this.messageStates.values()) { content += state.totalTextContent - thinking += state.thinkingContent + for (const stepId of state.thinkingStepOrder) { + thinking += state.thinkingSteps.get(stepId) ?? '' + } } return { @@ -1406,7 +1472,9 @@ export class StreamProcessor { for (const state of this.messageStates.values()) { content += state.totalTextContent - thinking += state.thinkingContent + for (const stepId of state.thinkingStepOrder) { + thinking += state.thinkingSteps.get(stepId) ?? '' + } for (const [id, tc] of state.toolCalls) { toolCalls.set(id, tc) } diff --git a/packages/typescript/ai/src/activities/chat/stream/types.ts b/packages/typescript/ai/src/activities/chat/stream/types.ts index c1806238f..0fb0c6a48 100644 --- a/packages/typescript/ai/src/activities/chat/stream/types.ts +++ b/packages/typescript/ai/src/activities/chat/stream/types.ts @@ -56,7 +56,10 @@ export interface MessageStreamState { totalTextContent: string currentSegmentText: string lastEmittedText: string - thinkingContent: string + thinkingSteps: Map + thinkingStepSignatures: Map + thinkingStepOrder: Array + currentThinkingStepId: string | null toolCalls: Map toolCallOrder: Array hasToolCallsSinceTextStart: boolean diff --git a/packages/typescript/ai/src/types.ts b/packages/typescript/ai/src/types.ts index f5ddaee59..b41e99d58 100644 --- a/packages/typescript/ai/src/types.ts +++ b/packages/typescript/ai/src/types.ts @@ -268,6 +268,7 @@ export interface ModelMessage< name?: string toolCalls?: Array toolCallId?: string + thinking?: Array<{ content: string; signature?: string }> } /** @@ -306,6 +307,8 @@ export interface ToolResultPart { export interface ThinkingPart { type: 'thinking' content: string + stepId?: string + signature?: string } export type MessagePart = @@ -901,6 +904,8 @@ export interface StepFinishedEvent extends BaseAGUIEvent { delta: string /** Full accumulated thinking content (optional, for debugging) */ content?: string + /** Provider signature for the thinking block */ + signature?: string } /** diff --git a/packages/typescript/ai/tests/message-updaters.test.ts b/packages/typescript/ai/tests/message-updaters.test.ts index 5de1a031e..f062ee482 100644 --- a/packages/typescript/ai/tests/message-updaters.test.ts +++ b/packages/typescript/ai/tests/message-updaters.test.ts @@ -777,24 +777,31 @@ describe('message-updaters', () => { describe('updateThinkingPart', () => { it('should add a new thinking part', () => { const messages = [createMessage('msg-1')] - const result = updateThinkingPart(messages, 'msg-1', 'Let me think...') + const result = updateThinkingPart( + messages, + 'msg-1', + 'step-1', + 'Let me think...', + ) expect(result[0]?.parts).toHaveLength(1) expect(result[0]?.parts[0]).toEqual({ type: 'thinking', content: 'Let me think...', + stepId: 'step-1', }) }) - it('should update existing thinking part', () => { + it('should update existing thinking part by stepId', () => { const messages = [ createMessage('msg-1', 'assistant', [ - { type: 'thinking', content: 'Let me think' }, + { type: 'thinking', content: 'Let me think', stepId: 'step-1' }, ]), ] const result = updateThinkingPart( messages, 'msg-1', + 'step-1', 'Let me think about this', ) @@ -802,26 +809,29 @@ describe('message-updaters', () => { expect(result[0]?.parts[0]).toEqual({ type: 'thinking', content: 'Let me think about this', + stepId: 'step-1', }) }) - it('should only update the first thinking part if multiple exist', () => { + it('should create separate parts for different stepIds', () => { const messages = [ createMessage('msg-1', 'assistant', [ - { type: 'thinking', content: 'First' }, + { type: 'thinking', content: 'First', stepId: 'step-1' }, { type: 'text', content: 'Some text' }, - { type: 'thinking', content: 'Second' }, ]), ] - const result = updateThinkingPart(messages, 'msg-1', 'Updated first') + const result = updateThinkingPart(messages, 'msg-1', 'step-2', 'Second') + expect(result[0]?.parts).toHaveLength(3) expect(result[0]?.parts[0]).toEqual({ type: 'thinking', - content: 'Updated first', + content: 'First', + stepId: 'step-1', }) expect(result[0]?.parts[2]).toEqual({ type: 'thinking', content: 'Second', + stepId: 'step-2', }) }) @@ -830,7 +840,12 @@ describe('message-updaters', () => { createMessage('msg-1'), createMessage('msg-2', 'user', [{ type: 'text', content: 'Hi' }]), ] - const result = updateThinkingPart(messages, 'msg-1', 'Thinking...') + const result = updateThinkingPart( + messages, + 'msg-1', + 'step-1', + 'Thinking...', + ) expect(result[0]?.parts).toHaveLength(1) expect(result[1]?.parts).toHaveLength(1) diff --git a/packages/typescript/ai/tests/stream-processor.test.ts b/packages/typescript/ai/tests/stream-processor.test.ts index 6bfe3a49f..c461c67f7 100644 --- a/packages/typescript/ai/tests/stream-processor.test.ts +++ b/packages/typescript/ai/tests/stream-processor.test.ts @@ -65,6 +65,8 @@ const ev = { ) => chunk('RUN_FINISHED', { runId, finishReason }), runError: (message: string, runId = 'run-1') => chunk('RUN_ERROR', { runId, error: { message } }), + stepStarted: (stepId = 'step-1', stepType = 'thinking') => + chunk('STEP_STARTED', { stepId, stepType }), stepFinished: (delta: string, stepId = 'step-1') => chunk('STEP_FINISHED', { stepId, delta }), custom: (name: string, value?: unknown) => chunk('CUSTOM', { name, value }), @@ -758,7 +760,7 @@ describe('StreamProcessor', () => { ).toBe(true) }) - it('should update a single ThinkingPart in-place', () => { + it('should update a single ThinkingPart in-place for same stepId', () => { const processor = new StreamProcessor() processor.prepareAssistantMessage() @@ -766,12 +768,67 @@ describe('StreamProcessor', () => { processor.processChunk(ev.stepFinished('B')) processor.processChunk(ev.stepFinished('C')) - // Only one thinking part, not three + // Only one thinking part, not three (same default stepId) const parts = processor.getMessages()[0]!.parts const thinkingParts = parts.filter((p) => p.type === 'thinking') expect(thinkingParts).toHaveLength(1) expect((thinkingParts[0] as any).content).toBe('ABC') }) + + it('should create separate ThinkingParts for different stepIds', () => { + const processor = new StreamProcessor() + processor.prepareAssistantMessage() + + processor.processChunk(ev.stepStarted('step-1')) + processor.processChunk(ev.stepFinished('First thought', 'step-1')) + processor.processChunk(ev.stepFinished(' continued', 'step-1')) + + processor.processChunk(ev.stepStarted('step-2')) + processor.processChunk(ev.stepFinished('Second thought', 'step-2')) + + const parts = processor.getMessages()[0]!.parts + const thinkingParts = parts.filter((p) => p.type === 'thinking') + expect(thinkingParts).toHaveLength(2) + expect((thinkingParts[0] as any).content).toBe('First thought continued') + expect((thinkingParts[0] as any).stepId).toBe('step-1') + expect((thinkingParts[1] as any).content).toBe('Second thought') + expect((thinkingParts[1] as any).stepId).toBe('step-2') + }) + + it('should handle STEP_FINISHED without prior STEP_STARTED (backward compat)', () => { + const processor = new StreamProcessor() + processor.prepareAssistantMessage() + + // No STEP_STARTED, just STEP_FINISHED with a stepId + processor.processChunk(ev.stepFinished('thinking...', 'auto-step')) + + const parts = processor.getMessages()[0]!.parts + const thinkingParts = parts.filter((p) => p.type === 'thinking') + expect(thinkingParts).toHaveLength(1) + expect((thinkingParts[0] as any).content).toBe('thinking...') + expect((thinkingParts[0] as any).stepId).toBe('auto-step') + }) + + it('getResult().thinking should concatenate all steps in order', () => { + const processor = new StreamProcessor() + processor.prepareAssistantMessage() + + processor.processChunk(ev.runStarted()) + processor.processChunk(ev.stepStarted('step-1')) + processor.processChunk(ev.stepFinished('First. ', 'step-1')) + processor.processChunk(ev.stepStarted('step-2')) + processor.processChunk(ev.stepFinished('Second.', 'step-2')) + processor.processChunk(ev.textStart()) + processor.processChunk(ev.textContent('Answer')) + processor.processChunk(ev.textEnd()) + processor.processChunk(ev.runFinished('stop')) + + processor.finalizeStream() + + const state = processor.getState() + expect(state.thinking).toBe('First. Second.') + expect(state.content).toBe('Answer') + }) }) // ========================================================================== @@ -1665,7 +1722,7 @@ describe('StreamProcessor', () => { ) }) - it('onThinkingUpdate should fire for each STEP_FINISHED delta', () => { + it('onThinkingUpdate should fire for each STEP_FINISHED delta with stepId', () => { const events = spyEvents() const processor = new StreamProcessor({ events }) processor.prepareAssistantMessage() @@ -1675,9 +1732,14 @@ describe('StreamProcessor', () => { const msgId = processor.getCurrentAssistantMessageId()! expect(events.onThinkingUpdate).toHaveBeenCalledTimes(2) - expect(events.onThinkingUpdate).toHaveBeenCalledWith(msgId, 'Thinking') expect(events.onThinkingUpdate).toHaveBeenCalledWith( msgId, + 'step-1', + 'Thinking', + ) + expect(events.onThinkingUpdate).toHaveBeenCalledWith( + msgId, + 'step-1', 'Thinking more', ) })