Skip to content

Commit aa39435

Browse files
authored
feat(opencode): add LLM stream stall detection and check_task visibility (#398) (#403)
* feat(opencode): add LLM stream stall detection and check_task visibility (#398) - Add lastTokenTime tracking in processor.ts stream loop - Detect stalls when no tokens received for configurable timeout (default 3min, OPENCODE_STALL_TIMEOUT_MS) - Export stalledSessions Set for cross-module stall tracking - Extend TaskResult with stallDetected, lastToolCalls, lastActivity for running tasks - Show last 3 tool calls with name, status, and timestamp in check_task - Validate OPENCODE_STALL_TIMEOUT_MS for NaN and non-positive values - Update async-tasks fork-feature manifest with stall detection markers Fixes #398, Fixes #399, Fixes #400 * refactor(opencode): encapsulate stalledSessions, add type guard, extract getStallTimeout (#398) - Make stalledSessions private with isSessionStalled/markSessionStalled/clearSessionStalled accessors - Replace raw Set export with encapsulated functions for proper module boundary - Add hasStartTime type guard replacing unsafe cast in check_task.ts - Extract getStallTimeout() function from inline env var parsing - Add getStallTimeout to fork-features manifest criticalCode - Strengthen type guard to validate time.start is number
1 parent 1250207 commit aa39435

File tree

5 files changed

+539
-7
lines changed

5 files changed

+539
-7
lines changed

.fork-features/manifest.json

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
"modifiedFiles": [
2626
"packages/opencode/src/session/index.ts",
2727
"packages/opencode/src/session/prompt.ts",
28+
"packages/opencode/src/session/processor.ts",
2829
"packages/opencode/src/tool/registry.ts"
2930
],
3031
"criticalCode": [
@@ -41,13 +42,24 @@
4142
"parent_session_id",
4243
"reserveTaskSlot",
4344
"getSessionTaskCount",
44-
"MAX_STORED_TASK_RESULTS"
45+
"MAX_STORED_TASK_RESULTS",
46+
"stallDetected",
47+
"lastToolCalls",
48+
"lastActivity",
49+
"isSessionStalled",
50+
"markSessionStalled",
51+
"clearSessionStalled",
52+
"lastTokenTime",
53+
"OPENCODE_STALL_TIMEOUT_MS",
54+
"getStallTimeout",
55+
"LLM stream stalled"
4556
],
4657
"tests": [
4758
"packages/opencode/test/tool/check_task.test.ts",
4859
"packages/opencode/test/tool/list_tasks.test.ts",
4960
"packages/opencode/test/tool/cancel_task.test.ts",
50-
"packages/opencode/test/session/async-tasks.test.ts"
61+
"packages/opencode/test/session/async-tasks.test.ts",
62+
"packages/opencode/test/session/processor-stall.test.ts"
5163
],
5264
"upstreamTracking": {
5365
"relatedPRs": ["anomalyco/opencode#7206"],
@@ -58,7 +70,14 @@
5870
"task.*concurrency.*slot",
5971
"cancel.*task",
6072
"CancelTaskTool",
61-
"tryCancel"
73+
"tryCancel",
74+
"stallDetected",
75+
"lastToolCalls",
76+
"lastActivity",
77+
"export function isSessionStalled",
78+
"stall.*detector",
79+
"stream.*stall",
80+
"lastTokenTime"
6281
]
6382
}
6483
},

packages/opencode/src/session/processor.ts

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,28 @@ export namespace SessionProcessor {
2020
const DOOM_LOOP_THRESHOLD = 3
2121
const log = Log.create({ service: "session.processor" })
2222

23+
const stalledSessions = new Set<string>()
24+
25+
export function isSessionStalled(id: string): boolean {
26+
return stalledSessions.has(id)
27+
}
28+
29+
function markSessionStalled(id: string) {
30+
stalledSessions.add(id)
31+
}
32+
33+
function clearSessionStalled(id: string) {
34+
stalledSessions.delete(id)
35+
}
36+
37+
function getStallTimeout(): number {
38+
const timeout = parseInt(process.env.OPENCODE_STALL_TIMEOUT_MS || "180000", 10)
39+
if (isNaN(timeout) || timeout <= 0) {
40+
throw new Error(`Invalid OPENCODE_STALL_TIMEOUT_MS: must be positive number, got "${process.env.OPENCODE_STALL_TIMEOUT_MS}"`)
41+
}
42+
return timeout
43+
}
44+
2345
export type Info = Awaited<ReturnType<typeof create>>
2446
export type Result = Awaited<ReturnType<Info["process"]>>
2547

@@ -50,16 +72,24 @@ export namespace SessionProcessor {
5072
try {
5173
let currentText: MessageV2.TextPart | undefined
5274
let reasoningMap: Record<string, MessageV2.ReasoningPart> = {}
75+
let lastTokenTime = Date.now()
76+
const stallTimeout = getStallTimeout()
5377
const stream = await LLM.stream(streamInput)
5478

5579
for await (const value of stream.fullStream) {
5680
input.abort.throwIfAborted()
81+
if (Date.now() - lastTokenTime > stallTimeout) {
82+
log.warn("stall", { sessionID: input.sessionID, elapsed: Date.now() - lastTokenTime })
83+
markSessionStalled(input.sessionID)
84+
throw new Error(`LLM stream stalled: no tokens received for ${Math.round(stallTimeout / 60000)} minutes`)
85+
}
5786
switch (value.type) {
5887
case "start":
5988
SessionStatus.set(input.sessionID, { type: "busy" })
6089
break
6190

6291
case "reasoning-start":
92+
lastTokenTime = Date.now()
6393
if (value.id in reasoningMap) {
6494
continue
6595
}
@@ -79,6 +109,7 @@ export namespace SessionProcessor {
79109
break
80110

81111
case "reasoning-delta":
112+
lastTokenTime = Date.now()
82113
if (value.id in reasoningMap) {
83114
const part = reasoningMap[value.id]
84115
part.text += value.text
@@ -132,6 +163,7 @@ export namespace SessionProcessor {
132163
break
133164

134165
case "tool-call": {
166+
lastTokenTime = Date.now()
135167
const match = toolcalls[value.toolCallId]
136168
if (match) {
137169
const part = await Session.updatePart({
@@ -178,6 +210,7 @@ export namespace SessionProcessor {
178210
break
179211
}
180212
case "tool-result": {
213+
lastTokenTime = Date.now()
181214
const match = toolcalls[value.toolCallId]
182215
if (match && match.state.status === "running") {
183216
await Session.updatePart({
@@ -218,6 +251,7 @@ export namespace SessionProcessor {
218251
}
219252

220253
case "tool-error": {
254+
lastTokenTime = Date.now()
221255
const match = toolcalls[value.toolCallId]
222256
const errorMsg = value.error instanceof Error ? value.error.message : String(value.error)
223257
if (match && match.state.status === "running") {
@@ -336,6 +370,7 @@ export namespace SessionProcessor {
336370
break
337371

338372
case "text-delta":
373+
lastTokenTime = Date.now()
339374
if (currentText) {
340375
currentText.text += value.text
341376
if (value.providerMetadata) currentText.metadata = value.providerMetadata
@@ -411,6 +446,7 @@ export namespace SessionProcessor {
411446
error: input.assistantMessage.error,
412447
})
413448
SessionStatus.set(input.sessionID, { type: "idle" })
449+
clearSessionStalled(input.sessionID)
414450
}
415451
if (snapshot) {
416452
const patch = await Snapshot.patch(snapshot)
@@ -445,13 +481,27 @@ export namespace SessionProcessor {
445481
}
446482
input.assistantMessage.time.completed = Date.now()
447483
await Session.updateMessage(input.assistantMessage)
448-
if (needsCompaction) return "compact"
449-
if (blocked) return "stop"
450-
if (input.assistantMessage.error) return "stop"
484+
if (needsCompaction) {
485+
clearSessionStalled(input.sessionID)
486+
return "compact"
487+
}
488+
if (blocked) {
489+
clearSessionStalled(input.sessionID)
490+
return "stop"
491+
}
492+
if (input.assistantMessage.error) {
493+
clearSessionStalled(input.sessionID)
494+
return "stop"
495+
}
496+
clearSessionStalled(input.sessionID)
451497
return "continue"
452498
}
453499
},
454500
}
455501
return result
456502
}
457503
}
504+
505+
export function isSessionStalled(id: string): boolean {
506+
return SessionProcessor.isSessionStalled(id)
507+
}

packages/opencode/src/tool/check_task.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { listBackgroundTasks, getBackgroundTaskResult, getBackgroundTaskMetadata
66
import { Instance } from "../project/instance"
77
import { SessionStatus } from "../session/status"
88
import { MessageV2 } from "../session/message-v2"
9+
import { isSessionStalled } from "../session/processor"
910

1011
type TaskStatus = "running" | "completed" | "failed" | "not_found" | "cancelled"
1112

@@ -19,6 +20,13 @@ interface TaskResult {
1920
duration_seconds?: number
2021
started_at?: string
2122
completed_at?: string
23+
stallDetected?: boolean
24+
lastToolCalls?: {
25+
name: string
26+
status: string
27+
time: string
28+
}[]
29+
lastActivity?: string
2230
}
2331

2432
interface CheckTaskMetadata {
@@ -27,6 +35,10 @@ interface CheckTaskMetadata {
2735
sessionId?: string
2836
}
2937

38+
function hasStartTime(part: MessageV2.ToolPart): part is MessageV2.ToolPart & { state: { time: { start: number } } } {
39+
return part.state.status !== "pending" && "time" in part.state && typeof (part.state as { time: { start: unknown } }).time.start === "number"
40+
}
41+
3042
function checkBackgroundTask(id: string): TaskResult | undefined {
3143
const tasks = listBackgroundTasks()
3244
if (tasks.pending.includes(id)) {
@@ -84,10 +96,28 @@ async function checkSessionTask(id: string, callerSessionId?: string): Promise<T
8496
const status = SessionStatus.get(id)
8597

8698
if (status.type === "busy") {
99+
const messages = await Session.messages({ sessionID: id, limit: 5 })
100+
const toolParts = messages.flatMap((msg) =>
101+
msg.info.role === "assistant" ? msg.parts.filter((part): part is MessageV2.ToolPart => part.type === "tool") : []
102+
)
103+
const recentTools = toolParts
104+
.filter(hasStartTime)
105+
.slice(-3)
106+
.map((part) => ({
107+
name: part.tool,
108+
status: part.state.status,
109+
time: new Date(part.state.time.start).toISOString(),
110+
}))
111+
const lastActivity = recentTools.length > 0 ? recentTools[recentTools.length - 1].time : new Date().toISOString()
112+
const stallDetected = isSessionStalled(id)
113+
87114
return {
88115
task_id: id,
89116
status: "running",
90117
started_at: started,
118+
stallDetected,
119+
lastToolCalls: recentTools,
120+
lastActivity,
91121
}
92122
}
93123

0 commit comments

Comments
 (0)