Skip to content

Commit 5daeb38

Browse files
Ascinoccoclaude
andcommitted
fix: address PR pingdotgg#179 review feedback for Claude adapter
Fix detached SDK stream fiber by replacing Effect.runFork with Effect.forkChild and adding explicit Fiber.interrupt on session stop. Add missing threadId to native event logger for per-thread log routing. Remove no-op asCanonicalTurnId identity function. Remove redundant threadId spread in session construction. Fix stale test expectations for thread identity behavior. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent b35f89c commit 5daeb38

2 files changed

Lines changed: 27 additions & 24 deletions

File tree

apps/server/src/provider/Layers/ClaudeCodeAdapter.test.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -546,14 +546,14 @@ describe("ClaudeCodeAdapterLive", () => {
546546
provider: "claudeCode",
547547
runtimeMode: "full-access",
548548
});
549-
assert.equal(session.threadId, undefined);
549+
assert.equal(session.threadId, THREAD_ID);
550550

551551
const turn = yield* adapter.sendTurn({
552552
threadId: session.threadId,
553553
input: "hello",
554554
attachments: [],
555555
});
556-
assert.equal(turn.threadId, undefined);
556+
assert.equal(turn.threadId, THREAD_ID);
557557

558558
harness.query.emit({
559559
type: "stream_event",
@@ -592,13 +592,13 @@ describe("ClaudeCodeAdapterLive", () => {
592592
const sessionStarted = runtimeEvents[0];
593593
assert.equal(sessionStarted?.type, "session.started");
594594
if (sessionStarted?.type === "session.started") {
595-
assert.equal("threadId" in sessionStarted, false);
595+
assert.equal(sessionStarted.threadId, THREAD_ID);
596596
}
597597

598598
const threadStarted = runtimeEvents[4];
599599
assert.equal(threadStarted?.type, "thread.started");
600600
if (threadStarted?.type === "thread.started") {
601-
assert.equal(threadStarted.threadId, "sdk-thread-real");
601+
assert.equal(threadStarted.threadId, THREAD_ID);
602602
}
603603
}).pipe(
604604
Effect.provideService(Random.Random, makeDeterministicRandomService()),
@@ -700,9 +700,9 @@ describe("ClaudeCodeAdapterLive", () => {
700700
runtimeMode: "full-access",
701701
});
702702

703-
assert.equal(session.threadId, "resume-thread-1");
703+
assert.equal(session.threadId, RESUME_THREAD_ID);
704704
assert.deepEqual(session.resumeCursor, {
705-
threadId: "resume-thread-1",
705+
threadId: RESUME_THREAD_ID,
706706
resume: "550e8400-e29b-41d4-a716-446655440000",
707707
resumeSessionAt: "assistant-99",
708708
turnCount: 3,

apps/server/src/provider/Layers/ClaudeCodeAdapter.ts

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import {
3434
ThreadId,
3535
TurnId,
3636
} from "@t3tools/contracts";
37-
import { Cause, DateTime, Deferred, Effect, Layer, Queue, Random, Ref, Stream } from "effect";
37+
import { Cause, DateTime, Deferred, Effect, Fiber, Layer, Queue, Random, Ref, Stream } from "effect";
3838

3939
import {
4040
ProviderAdapterProcessError,
@@ -106,6 +106,7 @@ interface ClaudeSessionContext {
106106
lastAssistantUuid: string | undefined;
107107
lastThreadStartedId: string | undefined;
108108
stopped: boolean;
109+
streamFiber: Fiber.Fiber<void, never> | undefined;
109110
}
110111

111112
interface ClaudeQueryRuntime extends AsyncIterable<SDKMessage> {
@@ -144,10 +145,6 @@ function asRuntimeItemId(value: string): RuntimeItemId {
144145
return RuntimeItemId.makeUnsafe(value);
145146
}
146147

147-
function asCanonicalTurnId(value: TurnId): TurnId {
148-
return value;
149-
}
150-
151148
function asRuntimeRequestId(value: ApprovalRequestId): RuntimeRequestId {
152149
return RuntimeRequestId.makeUnsafe(value);
153150
}
@@ -502,15 +499,16 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
502499
provider: PROVIDER,
503500
createdAt: observedAt,
504501
method: sdkNativeMethod(message),
502+
threadId: context.session.threadId,
505503
...(typeof message.session_id === "string"
506504
? { providerThreadId: message.session_id }
507505
: {}),
508-
...(context.turnState ? { turnId: asCanonicalTurnId(context.turnState.turnId) } : {}),
506+
...(context.turnState ? { turnId: context.turnState.turnId } : {}),
509507
...(itemId ? { itemId: ProviderItemId.makeUnsafe(itemId) } : {}),
510508
payload: message,
511509
},
512510
},
513-
null,
511+
context.session.threadId,
514512
);
515513
});
516514

@@ -613,7 +611,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
613611
provider: PROVIDER,
614612
createdAt: stamp.createdAt,
615613
threadId: context.session.threadId,
616-
...(turnState ? { turnId: asCanonicalTurnId(turnState.turnId) } : {}),
614+
...(turnState ? { turnId: turnState.turnId } : {}),
617615
payload: {
618616
message,
619617
class: "provider_error",
@@ -640,7 +638,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
640638
provider: PROVIDER,
641639
createdAt: stamp.createdAt,
642640
threadId: context.session.threadId,
643-
...(turnState ? { turnId: asCanonicalTurnId(turnState.turnId) } : {}),
641+
...(turnState ? { turnId: turnState.turnId } : {}),
644642
payload: {
645643
message,
646644
...(detail !== undefined ? { detail } : {}),
@@ -855,7 +853,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
855853
provider: PROVIDER,
856854
createdAt: stamp.createdAt,
857855
threadId: context.session.threadId,
858-
...(context.turnState ? { turnId: asCanonicalTurnId(context.turnState.turnId) } : {}),
856+
...(context.turnState ? { turnId: context.turnState.turnId } : {}),
859857
itemId: asRuntimeItemId(tool.itemId),
860858
payload: {
861859
itemType: tool.itemType,
@@ -896,7 +894,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
896894
provider: PROVIDER,
897895
createdAt: stamp.createdAt,
898896
threadId: context.session.threadId,
899-
...(context.turnState ? { turnId: asCanonicalTurnId(context.turnState.turnId) } : {}),
897+
...(context.turnState ? { turnId: context.turnState.turnId } : {}),
900898
itemId: asRuntimeItemId(tool.itemId),
901899
payload: {
902900
itemType: tool.itemType,
@@ -1006,7 +1004,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
10061004
provider: PROVIDER,
10071005
createdAt: stamp.createdAt,
10081006
threadId: context.session.threadId,
1009-
...(context.turnState ? { turnId: asCanonicalTurnId(context.turnState.turnId) } : {}),
1007+
...(context.turnState ? { turnId: context.turnState.turnId } : {}),
10101008
providerRefs: {
10111009
...providerThreadRef(context),
10121010
...(context.turnState ? { providerTurnId: context.turnState.turnId } : {}),
@@ -1165,7 +1163,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
11651163
provider: PROVIDER,
11661164
createdAt: stamp.createdAt,
11671165
threadId: context.session.threadId,
1168-
...(context.turnState ? { turnId: asCanonicalTurnId(context.turnState.turnId) } : {}),
1166+
...(context.turnState ? { turnId: context.turnState.turnId } : {}),
11691167
providerRefs: {
11701168
...providerThreadRef(context),
11711169
...(context.turnState ? { providerTurnId: context.turnState.turnId } : {}),
@@ -1295,6 +1293,11 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
12951293

12961294
context.stopped = true;
12971295

1296+
if (context.streamFiber) {
1297+
yield* Fiber.interrupt(context.streamFiber);
1298+
context.streamFiber = undefined;
1299+
}
1300+
12981301
for (const [requestId, pending] of context.pendingApprovals) {
12991302
yield* Deferred.succeed(pending.decision, "cancel");
13001303
const stamp = yield* makeEventStamp();
@@ -1304,7 +1307,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
13041307
provider: PROVIDER,
13051308
createdAt: stamp.createdAt,
13061309
threadId: context.session.threadId,
1307-
...(context.turnState ? { turnId: asCanonicalTurnId(context.turnState.turnId) } : {}),
1310+
...(context.turnState ? { turnId: context.turnState.turnId } : {}),
13081311
requestId: asRuntimeRequestId(requestId),
13091312
payload: {
13101313
requestType: pending.requestType,
@@ -1442,7 +1445,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
14421445
provider: PROVIDER,
14431446
createdAt: requestedStamp.createdAt,
14441447
threadId: context.session.threadId,
1445-
...(context.turnState ? { turnId: asCanonicalTurnId(context.turnState.turnId) } : {}),
1448+
...(context.turnState ? { turnId: context.turnState.turnId } : {}),
14461449
requestId: asRuntimeRequestId(requestId),
14471450
payload: {
14481451
requestType,
@@ -1494,7 +1497,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
14941497
provider: PROVIDER,
14951498
createdAt: resolvedStamp.createdAt,
14961499
threadId: context.session.threadId,
1497-
...(context.turnState ? { turnId: asCanonicalTurnId(context.turnState.turnId) } : {}),
1500+
...(context.turnState ? { turnId: context.turnState.turnId } : {}),
14981501
requestId: asRuntimeRequestId(requestId),
14991502
payload: {
15001503
requestType,
@@ -1584,7 +1587,6 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
15841587
runtimeMode: input.runtimeMode,
15851588
...(input.cwd ? { cwd: input.cwd } : {}),
15861589
...(input.model ? { model: input.model } : {}),
1587-
...(threadId ? { threadId } : {}),
15881590
resumeCursor: {
15891591
...(threadId ? { threadId } : {}),
15901592
...(resumeState?.resume ? { resume: resumeState.resume } : {}),
@@ -1610,6 +1612,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
16101612
lastAssistantUuid: resumeState?.resumeSessionAt,
16111613
lastThreadStartedId: undefined,
16121614
stopped: false,
1615+
streamFiber: undefined,
16131616
};
16141617
yield* Ref.set(contextRef, context);
16151618
sessions.set(threadId, context);
@@ -1658,7 +1661,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
16581661
providerRefs: {},
16591662
});
16601663

1661-
Effect.runFork(runSdkStream(context));
1664+
context.streamFiber = yield* Effect.forkChild(runSdkStream(context));
16621665

16631666
return {
16641667
...session,

0 commit comments

Comments
 (0)