Skip to content

Commit bc812e9

Browse files
Handle Claude stream interruption and collapse duplicate work log update
- close Claude sessions cleanly when the runtime stream aborts mid-turn - normalize interrupted stream exits into a completed interrupted turn - collapse repeated tool lifecycle updates into a single work log entry
1 parent fda3fa3 commit bc812e9

4 files changed

Lines changed: 524 additions & 63 deletions

File tree

apps/server/src/provider/Layers/ClaudeAdapter.test.ts

Lines changed: 103 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,12 @@ import type {
1010
SDKMessage,
1111
SDKUserMessage,
1212
} from "@anthropic-ai/claude-agent-sdk";
13-
import { ApprovalRequestId, ProviderItemId, ThreadId } from "@t3tools/contracts";
13+
import {
14+
ApprovalRequestId,
15+
ProviderItemId,
16+
ProviderRuntimeEvent,
17+
ThreadId,
18+
} from "@t3tools/contracts";
1419
import { assert, describe, it } from "@effect/vitest";
1520
import { Effect, Fiber, Layer, Random, Stream } from "effect";
1621

@@ -22,8 +27,12 @@ import { makeClaudeAdapterLive, type ClaudeAdapterLiveOptions } from "./ClaudeAd
2227

2328
class FakeClaudeQuery implements AsyncIterable<SDKMessage> {
2429
private readonly queue: Array<SDKMessage> = [];
25-
private readonly resolvers: Array<(value: IteratorResult<SDKMessage>) => void> = [];
30+
private readonly waiters: Array<{
31+
readonly resolve: (value: IteratorResult<SDKMessage>) => void;
32+
readonly reject: (reason: unknown) => void;
33+
}> = [];
2634
private done = false;
35+
private failure: unknown | undefined;
2736

2837
public readonly interruptCalls: Array<void> = [];
2938
public readonly setModelCalls: Array<string | undefined> = [];
@@ -35,21 +44,33 @@ class FakeClaudeQuery implements AsyncIterable<SDKMessage> {
3544
if (this.done) {
3645
return;
3746
}
38-
const resolver = this.resolvers.shift();
39-
if (resolver) {
40-
resolver({ done: false, value: message });
47+
const waiter = this.waiters.shift();
48+
if (waiter) {
49+
waiter.resolve({ done: false, value: message });
4150
return;
4251
}
4352
this.queue.push(message);
4453
}
4554

55+
fail(cause: unknown): void {
56+
if (this.done) {
57+
return;
58+
}
59+
this.done = true;
60+
this.failure = cause;
61+
for (const waiter of this.waiters.splice(0)) {
62+
waiter.reject(cause);
63+
}
64+
}
65+
4666
finish(): void {
4767
if (this.done) {
4868
return;
4969
}
5070
this.done = true;
51-
for (const resolver of this.resolvers.splice(0)) {
52-
resolver({ done: true, value: undefined });
71+
this.failure = undefined;
72+
for (const waiter of this.waiters.splice(0)) {
73+
waiter.resolve({ done: true, value: undefined });
5374
}
5475
}
5576

@@ -86,14 +107,22 @@ class FakeClaudeQuery implements AsyncIterable<SDKMessage> {
86107
});
87108
}
88109
}
110+
if (this.failure !== undefined) {
111+
const failure = this.failure;
112+
this.failure = undefined;
113+
return Promise.reject(failure);
114+
}
89115
if (this.done) {
90116
return Promise.resolve({
91117
done: true,
92118
value: undefined,
93119
});
94120
}
95-
return new Promise((resolve) => {
96-
this.resolvers.push(resolve);
121+
return new Promise((resolve, reject) => {
122+
this.waiters.push({
123+
resolve,
124+
reject,
125+
});
97126
});
98127
},
99128
};
@@ -1037,6 +1066,71 @@ describe("ClaudeAdapterLive", () => {
10371066
);
10381067
});
10391068

1069+
it.effect("closes the session when the Claude stream aborts after a turn starts", () => {
1070+
const harness = makeHarness();
1071+
return Effect.gen(function* () {
1072+
const adapter = yield* ClaudeAdapter;
1073+
const runtimeEvents: Array<ProviderRuntimeEvent> = [];
1074+
1075+
const runtimeEventsFiber = Effect.runFork(
1076+
Stream.runForEach(adapter.streamEvents, (event) =>
1077+
Effect.sync(() => {
1078+
runtimeEvents.push(event);
1079+
}),
1080+
),
1081+
);
1082+
1083+
yield* adapter.startSession({
1084+
threadId: THREAD_ID,
1085+
provider: "claudeAgent",
1086+
runtimeMode: "full-access",
1087+
});
1088+
1089+
const turn = yield* adapter.sendTurn({
1090+
threadId: THREAD_ID,
1091+
input: "hello",
1092+
attachments: [],
1093+
});
1094+
1095+
harness.query.fail(new Error("All fibers interrupted without error"));
1096+
1097+
yield* Effect.yieldNow;
1098+
yield* Effect.yieldNow;
1099+
yield* Effect.yieldNow;
1100+
runtimeEventsFiber.interruptUnsafe();
1101+
assert.deepEqual(
1102+
runtimeEvents.map((event) => event.type),
1103+
[
1104+
"session.started",
1105+
"session.configured",
1106+
"session.state.changed",
1107+
"turn.started",
1108+
"turn.completed",
1109+
"session.exited",
1110+
],
1111+
);
1112+
1113+
const turnCompleted = runtimeEvents[4];
1114+
assert.equal(turnCompleted?.type, "turn.completed");
1115+
if (turnCompleted?.type === "turn.completed") {
1116+
assert.equal(String(turnCompleted.turnId), String(turn.turnId));
1117+
assert.equal(turnCompleted.payload.state, "interrupted");
1118+
assert.equal(turnCompleted.payload.errorMessage, "Claude runtime interrupted.");
1119+
}
1120+
1121+
const sessionExited = runtimeEvents[5];
1122+
assert.equal(sessionExited?.type, "session.exited");
1123+
1124+
assert.equal(yield* adapter.hasSession(THREAD_ID), false);
1125+
const sessions = yield* adapter.listSessions();
1126+
assert.equal(sessions.length, 0);
1127+
assert.equal(harness.query.closeCalls, 1);
1128+
}).pipe(
1129+
Effect.provideService(Random.Random, makeDeterministicRandomService()),
1130+
Effect.provide(harness.layer),
1131+
);
1132+
});
1133+
10401134
it.effect("forwards Claude task progress summaries for subagent updates", () => {
10411135
const harness = makeHarness();
10421136
return Effect.gen(function* () {

apps/server/src/provider/Layers/ClaudeAdapter.ts

Lines changed: 106 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ import {
5151
DateTime,
5252
Deferred,
5353
Effect,
54+
Exit,
5455
FileSystem,
56+
Fiber,
5557
Layer,
5658
Queue,
5759
Random,
@@ -141,6 +143,7 @@ interface ClaudeSessionContext {
141143
session: ProviderSession;
142144
readonly promptQueue: Queue.Queue<PromptQueueItem>;
143145
readonly query: ClaudeQueryRuntime;
146+
streamFiber: Fiber.Fiber<void, Error> | undefined;
144147
readonly startedAt: string;
145148
readonly basePermissionMode: PermissionMode | undefined;
146149
resumeSessionId: string | undefined;
@@ -189,6 +192,47 @@ function toMessage(cause: unknown, fallback: string): string {
189192
return fallback;
190193
}
191194

195+
function toError(cause: unknown, fallback: string): Error {
196+
return cause instanceof Error ? cause : new Error(toMessage(cause, fallback));
197+
}
198+
199+
function normalizeClaudeStreamMessages(cause: Cause.Cause<Error>): ReadonlyArray<string> {
200+
const errors = Cause.prettyErrors(cause)
201+
.map((error) => error.message.trim())
202+
.filter((message) => message.length > 0);
203+
if (errors.length > 0) {
204+
return errors;
205+
}
206+
207+
const squashed = toMessage(Cause.squash(cause), "").trim();
208+
return squashed.length > 0 ? [squashed] : [];
209+
}
210+
211+
function isClaudeInterruptedMessage(message: string): boolean {
212+
const normalized = message.toLowerCase();
213+
return (
214+
normalized.includes("all fibers interrupted without error") ||
215+
normalized.includes("request was aborted") ||
216+
normalized.includes("interrupted by user")
217+
);
218+
}
219+
220+
function isClaudeInterruptedCause(cause: Cause.Cause<Error>): boolean {
221+
return (
222+
Cause.hasInterruptsOnly(cause) ||
223+
normalizeClaudeStreamMessages(cause).some(isClaudeInterruptedMessage)
224+
);
225+
}
226+
227+
function messageFromClaudeStreamCause(cause: Cause.Cause<Error>, fallback: string): string {
228+
return normalizeClaudeStreamMessages(cause)[0] ?? fallback;
229+
}
230+
231+
function interruptionMessageFromClaudeCause(cause: Cause.Cause<Error>): string {
232+
const message = messageFromClaudeStreamCause(cause, "Claude runtime interrupted.");
233+
return isClaudeInterruptedMessage(message) ? "Claude runtime interrupted." : message;
234+
}
235+
192236
function resultErrorsText(result: SDKResultMessage): string {
193237
return "errors" in result && Array.isArray(result.errors)
194238
? result.errors.join(" ").toLowerCase()
@@ -2045,21 +2089,48 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) {
20452089
}
20462090
});
20472091

2048-
const runSdkStream = (context: ClaudeSessionContext): Effect.Effect<void> =>
2049-
Stream.fromAsyncIterable(context.query, (cause) => cause).pipe(
2092+
const runSdkStream = (context: ClaudeSessionContext): Effect.Effect<void, Error> =>
2093+
Stream.fromAsyncIterable(context.query, (cause) =>
2094+
toError(cause, "Claude runtime stream failed."),
2095+
).pipe(
20502096
Stream.takeWhile(() => !context.stopped),
20512097
Stream.runForEach((message) => handleSdkMessage(context, message)),
2052-
Effect.catchCause((cause) =>
2053-
Effect.gen(function* () {
2054-
if (Cause.hasInterruptsOnly(cause) || context.stopped) {
2055-
return;
2098+
);
2099+
2100+
const handleStreamExit = (
2101+
context: ClaudeSessionContext,
2102+
exit: Exit.Exit<void, Error>,
2103+
): Effect.Effect<void> =>
2104+
Effect.gen(function* () {
2105+
if (context.stopped) {
2106+
return;
2107+
}
2108+
2109+
if (Exit.isFailure(exit)) {
2110+
if (isClaudeInterruptedCause(exit.cause)) {
2111+
if (context.turnState) {
2112+
yield* completeTurn(
2113+
context,
2114+
"interrupted",
2115+
interruptionMessageFromClaudeCause(exit.cause),
2116+
);
20562117
}
2057-
const message = toMessage(Cause.squash(cause), "Claude runtime stream failed.");
2058-
yield* emitRuntimeError(context, message, cause);
2118+
} else {
2119+
const message = messageFromClaudeStreamCause(
2120+
exit.cause,
2121+
"Claude runtime stream failed.",
2122+
);
2123+
yield* emitRuntimeError(context, message, Cause.pretty(exit.cause));
20592124
yield* completeTurn(context, "failed", message);
2060-
}),
2061-
),
2062-
);
2125+
}
2126+
} else if (context.turnState) {
2127+
yield* completeTurn(context, "interrupted", "Claude runtime stream ended.");
2128+
}
2129+
2130+
yield* stopSessionInternal(context, {
2131+
emitExitEvent: true,
2132+
});
2133+
});
20632134

20642135
const stopSessionInternal = (
20652136
context: ClaudeSessionContext,
@@ -2096,7 +2167,18 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) {
20962167

20972168
yield* Queue.shutdown(context.promptQueue);
20982169

2099-
context.query.close();
2170+
const streamFiber = context.streamFiber;
2171+
context.streamFiber = undefined;
2172+
if (streamFiber && streamFiber.pollUnsafe() === undefined) {
2173+
yield* Fiber.interrupt(streamFiber);
2174+
}
2175+
2176+
// @effect-diagnostics-next-line tryCatchInEffectGen:off
2177+
try {
2178+
context.query.close();
2179+
} catch (cause) {
2180+
yield* emitRuntimeError(context, "Failed to close Claude runtime query.", cause);
2181+
}
21002182

21012183
const updatedAt = yield* nowIso;
21022184
context.session = {
@@ -2536,6 +2618,7 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) {
25362618
session,
25372619
promptQueue,
25382620
query: queryRuntime,
2621+
streamFiber: undefined,
25392622
startedAt,
25402623
basePermissionMode: permissionMode,
25412624
resumeSessionId: sessionId,
@@ -2597,7 +2680,17 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) {
25972680
providerRefs: {},
25982681
});
25992682

2600-
Effect.runFork(runSdkStream(context));
2683+
const streamFiber = Effect.runFork(runSdkStream(context));
2684+
context.streamFiber = streamFiber;
2685+
streamFiber.addObserver((exit) => {
2686+
if (context.stopped) {
2687+
return;
2688+
}
2689+
if (context.streamFiber === streamFiber) {
2690+
context.streamFiber = undefined;
2691+
}
2692+
Effect.runFork(handleStreamExit(context, exit));
2693+
});
26012694

26022695
return {
26032696
...session,

0 commit comments

Comments
 (0)