From e7f358e1af0500bfb8468ab32172cd0522daf922 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Fri, 5 Jun 2026 00:43:59 -0400 Subject: [PATCH 1/6] feat(core): interrupt v2 session execution --- AGENTS.md | 2 +- packages/core/src/location-layer.ts | 3 - packages/core/src/public/opencode.ts | 1 + packages/core/src/public/session.ts | 2 + packages/core/src/session.ts | 4 + packages/core/src/session/execution.ts | 7 +- packages/core/src/session/execution/local.ts | 33 +- packages/core/src/session/run-coordinator.ts | 158 ++++--- packages/core/src/session/runner/llm.ts | 4 + packages/core/test/public-opencode.test.ts | 1 + packages/core/test/session-prompt.test.ts | 26 ++ .../core/test/session-run-coordinator.test.ts | 422 ++++++++++++++++++ .../core/test/session-runner-recorded.test.ts | 8 +- packages/core/test/session-runner.test.ts | 85 +++- specs/v2/session.md | 9 +- specs/v2/todo.md | 4 +- 16 files changed, 683 insertions(+), 86 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 6ed0761b8977..f41f61aa10c4 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -143,7 +143,7 @@ const table = sqliteTable("session", { - Keep durable prompt admission separate from model execution. `SessionV2.prompt(...)` admits one durable `session_input` row before scheduling advisory `SessionExecution.wake(sessionID)` unless `resume: false` requests admit-only behavior. The serialized runner promotes admitted inputs into visible user messages at safe boundaries. - Reusing a Session ID adopts the existing Session. Reusing a prompt message ID reconciles an exact retry only when Session, prompt, and delivery mode match; conflicting reuse fails. Historical projected prompts lazily synthesize promoted inbox records during exact retry. -- Keep `SessionExecution` process-global and Session-ID based. It discovers placement through the read-side `SessionStore` and `LocationServiceMap.get(session.location)`; no layer should take a Session ID. +- Keep `SessionExecution` process-global and Session-ID based. Its local implementation owns the process-local Session coordinator and discovers placement through `SessionStore` plus `LocationServiceMap.get(session.location)` only when a drain starts; no layer should take a Session ID. V2 interruption targets the active process-local ownership chain for that Session; idle or missing interruption is a no-op. - Keep `SessionRunner`, model resolution, tool registry, permissions, and filesystem Location-scoped. Omitted `Location.workspaceID` means implicit-local placement; explicit workspace identity remains reserved for future placement semantics. - Preserve one explicit `llm.stream(request)` call per provider turn and reload projected history before durable continuation. Do not bridge through legacy `SessionPrompt.loop(...)` or delegate orchestration to an in-memory tool loop. - Keep local Session drains process-local until clustering is implemented. `SessionRunCoordinator` joins explicit same-Session resumes, coalesces prompt wakeups, and allows different Sessions to run concurrently. Advisory wakes drain eligible durable inbox rows only; post-crash activity recovery requires a separate explicit design before it may retry provider work. diff --git a/packages/core/src/location-layer.ts b/packages/core/src/location-layer.ts index c1621c404f66..15561acb6a68 100644 --- a/packages/core/src/location-layer.ts +++ b/packages/core/src/location-layer.ts @@ -39,7 +39,6 @@ import { LLMClient } from "@opencode-ai/llm" import { RequestExecutor } from "@opencode-ai/llm/route" import * as SessionRunnerLLM from "./session/runner/llm" import { SessionRunnerModel } from "./session/runner/model" -import { SessionRunCoordinator } from "./session/run-coordinator" import { SystemContextBuiltIns } from "./system-context-builtins" import { FetchHttpClient } from "effect/unstable/http" @@ -81,7 +80,6 @@ export class LocationServiceMap extends LayerMap.Service()(" ) const model = SessionRunnerModel.locationLayer.pipe(Layer.provide(services)) const runner = SessionRunnerLLM.defaultLayer.pipe(Layer.provide(services), Layer.provide(model)) - const coordinator = SessionRunCoordinator.layer.pipe(Layer.provide(runner)) return Layer.mergeAll( services, commits, @@ -91,7 +89,6 @@ export class LocationServiceMap extends LayerMap.Service()(" questions, model, runner, - coordinator, builtInTools, ).pipe(Layer.fresh) }, diff --git a/packages/core/src/public/opencode.ts b/packages/core/src/public/opencode.ts index 8ec58de4d550..4e85d5fe1144 100644 --- a/packages/core/src/public/opencode.ts +++ b/packages/core/src/public/opencode.ts @@ -51,6 +51,7 @@ export const layer = Layer.effect( }), get: sessions.get, list: sessions.list, + interrupt: sessions.interrupt, prompt: (input) => sessions.prompt({ id: input.id, diff --git a/packages/core/src/public/session.ts b/packages/core/src/public/session.ts index f66fe2b0881f..f4b5c79620e9 100644 --- a/packages/core/src/public/session.ts +++ b/packages/core/src/public/session.ts @@ -84,6 +84,8 @@ export interface Interface { readonly get: (sessionID: ID) => Effect.Effect readonly list: (input?: ListInput) => Effect.Effect readonly prompt: (input: PromptInput) => Effect.Effect + /** Interrupt the active V2 execution chain for one Session on this process. Interrupting an idle or missing Session is a no-op. */ + readonly interrupt: (sessionID: ID) => Effect.Effect readonly messages: (input: MessagesInput) => Effect.Effect readonly message: (input: MessageInput) => Effect.Effect readonly context: (sessionID: ID) => Effect.Effect diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index 6ccc46da6c84..e6885891f93c 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -155,6 +155,7 @@ export interface Interface { readonly compact: (input: CompactInput) => Effect.Effect readonly wait: (id: SessionSchema.ID) => Effect.Effect readonly resume: (sessionID: SessionSchema.ID) => Effect.Effect + readonly interrupt: (sessionID: SessionSchema.ID) => Effect.Effect } export class Service extends Context.Service()("@opencode/v2/Session") {} @@ -399,6 +400,9 @@ export const layer = Layer.effect( yield* result.get(sessionID) yield* execution.resume(sessionID) }), + interrupt: Effect.fn("V2Session.interrupt")(function* (sessionID) { + yield* execution.interrupt(sessionID) + }), }) return result diff --git a/packages/core/src/session/execution.ts b/packages/core/src/session/execution.ts index 9c5f9f4b4f61..af422a9e2e98 100644 --- a/packages/core/src/session/execution.ts +++ b/packages/core/src/session/execution.ts @@ -9,10 +9,15 @@ export interface Interface { readonly resume: (sessionID: SessionSchema.ID) => Effect.Effect /** Schedule a drain after durable work is recorded. Repeated wakeups may coalesce. */ readonly wake: (sessionID: SessionSchema.ID) => Effect.Effect + /** Interrupt active work owned by this process. Idle interruption is a no-op. */ + readonly interrupt: (sessionID: SessionSchema.ID) => Effect.Effect } /** Routes execution from a Session ID to the runner owned by that Session's Location. */ export class Service extends Context.Service()("@opencode/v2/SessionExecution") {} /** Low-level compatibility layer for callers that only need durable Session recording. */ -export const noopLayer = Layer.succeed(Service, Service.of({ resume: () => Effect.void, wake: () => Effect.void })) +export const noopLayer = Layer.succeed( + Service, + Service.of({ resume: () => Effect.void, wake: () => Effect.void, interrupt: () => Effect.void }), +) diff --git a/packages/core/src/session/execution/local.ts b/packages/core/src/session/execution/local.ts index 478cecfc2c02..f933d43d6ca3 100644 --- a/packages/core/src/session/execution/local.ts +++ b/packages/core/src/session/execution/local.ts @@ -1,6 +1,7 @@ import { Effect, Layer } from "effect" import { LocationServiceMap } from "../../location-layer" import { SessionRunCoordinator } from "../run-coordinator" +import { SessionRunner } from "../runner" import { SessionSchema } from "../schema" import { SessionStore } from "../store" import { SessionExecution } from "../execution" @@ -11,25 +12,25 @@ export const layer = Layer.effect( Effect.gen(function* () { const store = yield* SessionStore.Service const locations = yield* LocationServiceMap - const scope = yield* Effect.scope - const withCoordinator = Effect.fnUntraced(function* ( - sessionID: SessionSchema.ID, - use: (coordinator: SessionRunCoordinator.Interface) => Effect.Effect, - ) { - const session = yield* store.get(sessionID) - if (!session) return yield* Effect.die(`Session not found: ${sessionID}`) - return yield* SessionRunCoordinator.Service.use(use).pipe(Effect.provide(locations.get(session.location))) + const coordinator = yield* SessionRunCoordinator.make({ + drain: Effect.fnUntraced(function* (sessionID: SessionSchema.ID, mode) { + const session = yield* store.get(sessionID) + if (!session) return yield* Effect.die(`Session not found: ${sessionID}`) + return yield* SessionRunner.Service.use((runner) => runner.run({ sessionID, force: mode === "run" })).pipe( + Effect.provide(locations.get(session.location)), + ) + }), + onFailure: (sessionID, cause) => + Effect.logError("Failed to drain Session").pipe( + Effect.annotateLogs("sessionID", sessionID), + Effect.annotateLogs("cause", cause), + ), }) return SessionExecution.Service.of({ - resume: Effect.fn("SessionExecution.resume")(function* (sessionID) { - return yield* withCoordinator(sessionID, (coordinator) => coordinator.run(sessionID)) - }), - wake: Effect.fn("SessionExecution.wake")(function* (sessionID) { - yield* withCoordinator(sessionID, (coordinator) => - coordinator.wake(sessionID).pipe(Effect.andThen(coordinator.awaitIdle(sessionID))), - ).pipe(Effect.forkIn(scope), Effect.asVoid) - }), + interrupt: coordinator.interrupt, + resume: coordinator.run, + wake: coordinator.wake, }) }), ) diff --git a/packages/core/src/session/run-coordinator.ts b/packages/core/src/session/run-coordinator.ts index c5bed8a42b50..a60010594323 100644 --- a/packages/core/src/session/run-coordinator.ts +++ b/packages/core/src/session/run-coordinator.ts @@ -1,6 +1,6 @@ export * as SessionRunCoordinator from "./run-coordinator" -import { Cause, Context, Deferred, Effect, Exit, FiberSet, Layer, Scope } from "effect" +import { Cause, Context, Deferred, Effect, Exit, Fiber, FiberSet, Layer, Scope } from "effect" import { SessionRunner } from "./runner" import { SessionSchema } from "./schema" @@ -18,6 +18,9 @@ export type Mode = "run" | "wake" * * `wake` reports that durable work may now be available. It starts a chain while idle or * requests one coalesced follow-up while draining. Repeated wakes collapse together. + * + * `interrupt` stops the current ownership chain. Wakes and explicit runs arriving after the + * interruption request become a fresh successor; previously queued reruns are suppressed. */ export interface Coordinator { /** Starts or joins one explicit drain generation. */ @@ -26,13 +29,19 @@ export interface Coordinator { readonly wake: (key: Key) => Effect.Effect /** Waits until the current ownership chain settles. */ readonly awaitIdle: (key: Key) => Effect.Effect + /** Interrupts the active ownership chain. Later requests may start a fresh successor. */ + readonly interrupt: (key: Key) => Effect.Effect } type Entry = { readonly done: Deferred.Deferred + readonly settled: Deferred.Deferred> mode: Mode rerun?: Mode explicit?: Deferred.Deferred + successorExplicit?: Deferred.Deferred + owner?: Fiber.Fiber + stopping: boolean } const strongest = (left: Mode | undefined, right: Mode): Mode => (left === "run" || right === "run" ? "run" : "wake") @@ -44,7 +53,7 @@ export const make = (options: { }): Effect.Effect, never, Scope.Scope> => Effect.gen(function* () { const active = new Map>() - const scope = yield* Effect.scope + const report = yield* FiberSet.makeRuntime() const fork = yield* FiberSet.makeRuntime() const shutdown = Deferred.makeUnsafe() let closed = false @@ -58,49 +67,77 @@ export const make = (options: { const makeEntry = (mode: Mode, explicit?: Deferred.Deferred): Entry => ({ done: Deferred.makeUnsafe(), + settled: Deferred.makeUnsafe>(), mode, explicit, + stopping: false, }) - const start = (key: Key, entry: Entry, mode: Mode) => { - fork(own(key, entry, mode)) + const start = (key: Key, entry: Entry, mode: Mode, successor = false) => { + const ready = Deferred.makeUnsafe() + const drain = Effect.suspend(() => options.drain(key, mode)) + // Initial work retains immediate-start behavior but cannot run before ownership is published. + // Observer-started successors yield once so synchronous drains cannot recurse on the JS stack. + const owner = fork( + (successor ? Effect.yieldNow.pipe(Effect.andThen(drain)) : Deferred.await(ready).pipe(Effect.andThen(drain))).pipe( + Effect.onExit((exit) => Effect.sync(() => settle(key, entry, mode, exit))), + Effect.exit, + Effect.asVoid, + ), + ) + entry.owner = owner + if (!successor) Deferred.doneUnsafe(ready, Effect.void) } - const own = (key: Key, entry: Entry, mode: Mode): Effect.Effect => - Effect.suspend(() => options.drain(key, mode)).pipe( - Effect.exit, - Effect.flatMap((exit) => { - if (closed) return Deferred.done(entry.done, exit).pipe(Effect.asVoid) - if (mode === "run" && entry.explicit !== undefined) { - Deferred.doneUnsafe(entry.explicit, exit) - entry.explicit = undefined - } - if (exit._tag === "Success") { - if (active.get(key) !== entry) return Deferred.done(entry.done, exit).pipe(Effect.asVoid) - if (entry.rerun !== undefined) { - const mode = entry.rerun - entry.rerun = undefined - entry.mode = mode - return own(key, entry, mode) - } - active.delete(key) - return Deferred.done(entry.done, exit).pipe(Effect.asVoid) - } - - const successor = - active.get(key) === entry && entry.rerun !== undefined ? makeEntry(entry.rerun, entry.explicit) : undefined - if (successor === undefined) active.delete(key) - else { - active.set(key, successor) - } - if (successor !== undefined) start(key, successor, successor.mode) - const report = - mode === "wake" && options.onFailure !== undefined - ? options.onFailure(key, exit.cause).pipe(Effect.forkIn(scope), Effect.asVoid) - : Effect.void - return Deferred.done(entry.done, exit).pipe(Effect.andThen(report), Effect.asVoid) - }), - ) + const settle = (key: Key, entry: Entry, mode: Mode, exit: Exit.Exit) => { + if (closed) { + Deferred.doneUnsafe(entry.done, exit) + Deferred.doneUnsafe(entry.settled, Effect.succeed(exit)) + return + } + if (mode === "run" && entry.explicit !== undefined) { + Deferred.doneUnsafe(entry.explicit, exit) + entry.explicit = undefined + } + if (entry.stopping && mode === "wake" && entry.explicit !== undefined) { + Deferred.doneUnsafe(entry.explicit, exit) + entry.explicit = undefined + } + if (active.get(key) !== entry) { + Deferred.doneUnsafe(entry.done, exit) + Deferred.doneUnsafe(entry.settled, Effect.succeed(exit)) + return + } + if (exit._tag === "Success" && !entry.stopping) { + if (entry.rerun !== undefined) { + const mode = entry.rerun + entry.rerun = undefined + entry.mode = mode + start(key, entry, mode, true) + return + } + active.delete(key) + Deferred.doneUnsafe(entry.done, exit) + Deferred.doneUnsafe(entry.settled, Effect.succeed(exit)) + return + } + + const successorExplicit = entry.successorExplicit ?? (mode === "wake" ? entry.explicit : undefined) + const successor = entry.rerun !== undefined ? makeEntry(entry.rerun, successorExplicit) : undefined + if (successor === undefined) active.delete(key) + else active.set(key, successor) + if (successor !== undefined) start(key, successor, successor.mode, true) + Deferred.doneUnsafe(entry.done, exit) + Deferred.doneUnsafe(entry.settled, Effect.succeed(exit)) + if ( + exit._tag === "Failure" && + !(entry.stopping && Cause.hasInterruptsOnly(exit.cause)) && + mode === "wake" && + options.onFailure !== undefined + ) { + report(Effect.suspend(() => options.onFailure!(key, exit.cause))) + } + } const wake = (key: Key) => Effect.sync(() => { @@ -123,7 +160,7 @@ export const make = (options: { const entry = active.get(key) if (entry === undefined) break const exit = yield* Effect.raceFirst( - Deferred.await(entry.done).pipe(Effect.exit), + Deferred.await(entry.settled), Deferred.await(shutdown).pipe(Effect.as(Exit.void)), ) if (closed) break @@ -132,13 +169,30 @@ export const make = (options: { if (firstFailure !== undefined) return yield* Effect.failCause(firstFailure) }) - return { run, wake, awaitIdle } + const interrupt = (key: Key): Effect.Effect => + Effect.suspend(() => { + const entry = active.get(key) + if (entry?.owner === undefined) return Effect.void + if (!entry.stopping) { + entry.stopping = true + entry.rerun = undefined + } + return Fiber.interrupt(entry.owner) + }) + + return { run, wake, awaitIdle, interrupt } function run(key: Key): Effect.Effect { return Effect.uninterruptibleMask((restore) => { if (closed) return Effect.interrupt const entry = active.get(key) if (entry !== undefined) { + if (entry.stopping) { + entry.rerun = strongest(entry.rerun, "run") + entry.successorExplicit ??= + entry.mode === "wake" ? (entry.explicit ?? Deferred.makeUnsafe()) : Deferred.makeUnsafe() + return restore(awaitRun(entry.successorExplicit)) + } if (entry.mode === "wake") { entry.rerun = "run" entry.explicit ??= Deferred.makeUnsafe() @@ -165,19 +219,17 @@ export class Service extends Context.Service()("@opencode/v2 export const layer = Layer.effect( Service, - Effect.gen(function* () { - const runner = yield* SessionRunner.Service - return Service.of( - yield* make({ + SessionRunner.Service.pipe( + Effect.flatMap((runner) => + make({ drain: (sessionID, mode) => runner.run({ sessionID, force: mode === "run" }), onFailure: (sessionID, cause) => - Cause.hasInterruptsOnly(cause) - ? Effect.void - : Effect.logError("Failed to drain Session").pipe( - Effect.annotateLogs("sessionID", sessionID), - Effect.annotateLogs("cause", cause), - ), + Effect.logError("Failed to drain Session").pipe( + Effect.annotateLogs("sessionID", sessionID), + Effect.annotateLogs("cause", cause), + ), }), - ) - }), + ), + Effect.map(Service.of), + ), ) diff --git a/packages/core/src/session/runner/llm.ts b/packages/core/src/session/runner/llm.ts index f4b62e09fc38..f1f78b8b1dee 100644 --- a/packages/core/src/session/runner/llm.ts +++ b/packages/core/src/session/runner/llm.ts @@ -16,6 +16,7 @@ import { SessionInput } from "../input" import { QuestionV2 } from "../../question" import { SystemContextRegistry } from "../../system-context-registry" import { SessionContextEpoch } from "../context-epoch" +import { Location } from "../../location" /** * Runs one durable coding-agent Session until it settles. @@ -87,6 +88,7 @@ export const layer = Layer.effect( const tools = yield* ToolRegistry.Service const models = yield* SessionRunnerModel.Service const store = yield* SessionStore.Service + const location = yield* Location.Service const systemContext = yield* SystemContextRegistry.Service const db = (yield* Database.Service).db const getSession = Effect.fn("SessionRunner.getSession")(function* (sessionID: SessionSchema.ID) { @@ -132,6 +134,8 @@ export const layer = Layer.effect( promotion: "steer" | "queue" | undefined, ) { const session = yield* getSession(sessionID) + if (session.location.directory !== location.directory || session.location.workspaceID !== location.workspaceID) + return yield* Effect.interrupt const initialized = yield* SessionContextEpoch.initialize(db, systemContext, session.id, session.location) const model = yield* models.resolve(session) const toolFibers = yield* FiberSet.make() diff --git a/packages/core/test/public-opencode.test.ts b/packages/core/test/public-opencode.test.ts index 05cc7c3b86a3..c86a3c28bb66 100644 --- a/packages/core/test/public-opencode.test.ts +++ b/packages/core/test/public-opencode.test.ts @@ -17,6 +17,7 @@ describe("public native OpenCode API", () => { "create", "events", "get", + "interrupt", "list", "message", "messages", diff --git a/packages/core/test/session-prompt.test.ts b/packages/core/test/session-prompt.test.ts index d73170ef6bcc..1d46205e1871 100644 --- a/packages/core/test/session-prompt.test.ts +++ b/packages/core/test/session-prompt.test.ts @@ -23,6 +23,7 @@ const events = EventV2.layer.pipe(Layer.provide(database)) const projector = SessionProjector.layer.pipe(Layer.provide(events), Layer.provide(database)) const store = SessionStore.layer.pipe(Layer.provide(database)) const executionCalls: SessionV2.ID[] = [] +const interruptCalls: SessionV2.ID[] = [] const wakeCalls: SessionV2.ID[] = [] const execution = Layer.succeed( SessionExecution.Service, @@ -31,6 +32,10 @@ const execution = Layer.succeed( Effect.sync(() => { executionCalls.push(sessionID) }), + interrupt: (sessionID) => + Effect.sync(() => { + interruptCalls.push(sessionID) + }), wake: (sessionID) => Effect.sync(() => { wakeCalls.push(sessionID) @@ -108,6 +113,27 @@ describe("SessionV2.prompt", () => { }), ) + it.effect("delegates interruption through SessionExecution", () => + Effect.gen(function* () { + yield* setup + const session = yield* SessionV2.Service + interruptCalls.length = 0 + + yield* session.interrupt(sessionID) + expect(interruptCalls).toEqual([sessionID]) + }), + ) + + it.effect("delegates interruption without requiring a recorded Session", () => + Effect.gen(function* () { + const session = yield* SessionV2.Service + interruptCalls.length = 0 + + yield* session.interrupt(SessionV2.ID.make("ses_missing")) + expect(interruptCalls).toEqual([SessionV2.ID.make("ses_missing")]) + }), + ) + it.effect("durably admits one user message before transcript promotion", () => Effect.gen(function* () { yield* setup diff --git a/packages/core/test/session-run-coordinator.test.ts b/packages/core/test/session-run-coordinator.test.ts index 8f67662ccd6f..124ecf42906b 100644 --- a/packages/core/test/session-run-coordinator.test.ts +++ b/packages/core/test/session-run-coordinator.test.ts @@ -41,6 +41,329 @@ describe("SessionRunCoordinator", () => { ), ) + it.effect("does nothing when interrupted while idle", () => + Effect.scoped( + Effect.gen(function* () { + const coordinator = yield* SessionRunCoordinator.make({ drain: () => Effect.void }) + + yield* coordinator.interrupt("session") + }), + ), + ) + + it.effect("interrupts only the requested key", () => + Effect.scoped( + Effect.gen(function* () { + const firstStarted = yield* Deferred.make() + const secondStarted = yield* Deferred.make() + const secondGate = yield* Deferred.make() + const secondInterrupted = yield* Deferred.make() + const coordinator = yield* SessionRunCoordinator.make({ + drain: (key: string) => + key === "first" + ? Deferred.succeed(firstStarted, undefined).pipe(Effect.andThen(Effect.never)) + : Deferred.succeed(secondStarted, undefined).pipe( + Effect.andThen(Deferred.await(secondGate)), + Effect.onInterrupt(() => Deferred.succeed(secondInterrupted, undefined)), + ), + }) + + yield* coordinator.wake("first") + yield* coordinator.wake("second") + yield* Effect.all([Deferred.await(firstStarted), Deferred.await(secondStarted)]) + + yield* coordinator.interrupt("first") + expect(yield* Deferred.isDone(secondInterrupted)).toBeFalse() + yield* Deferred.succeed(secondGate, undefined) + yield* coordinator.awaitIdle("second") + }), + ), + ) + + it.effect("interrupts the active drain and suppresses its queued wake", () => + Effect.scoped( + Effect.gen(function* () { + const firstStarted = yield* Deferred.make() + const interrupted = yield* Deferred.make() + let runs = 0 + const coordinator = yield* SessionRunCoordinator.make({ + drain: () => + Effect.sync(() => ++runs).pipe( + Effect.flatMap((run) => + run === 1 + ? Deferred.succeed(firstStarted, undefined).pipe( + Effect.andThen(Effect.never), + Effect.onInterrupt(() => Deferred.succeed(interrupted, undefined)), + ) + : Effect.void, + ), + ), + }) + + const run = yield* coordinator.run("session").pipe(Effect.forkChild) + yield* Deferred.await(firstStarted) + yield* coordinator.wake("session") + + yield* coordinator.interrupt("session") + yield* Deferred.await(interrupted) + yield* coordinator.awaitIdle("session") + const exit = yield* Fiber.await(run) + expect(Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)).toBeTrue() + expect(runs).toBe(1) + yield* coordinator.interrupt("session") + }), + ), + ) + + it.effect("allows a wake received after interruption to start a successor", () => + Effect.scoped( + Effect.gen(function* () { + const firstStarted = yield* Deferred.make() + const firstInterrupted = yield* Deferred.make() + const secondStarted = yield* Deferred.make() + let runs = 0 + const coordinator = yield* SessionRunCoordinator.make({ + drain: () => + Effect.sync(() => ++runs).pipe( + Effect.flatMap((run) => + run === 1 + ? Deferred.succeed(firstStarted, undefined).pipe( + Effect.andThen(Effect.never), + Effect.onInterrupt(() => Deferred.succeed(firstInterrupted, undefined)), + ) + : Deferred.succeed(secondStarted, undefined), + ), + ), + }) + + yield* coordinator.wake("session") + yield* Deferred.await(firstStarted) + const interrupt = yield* coordinator.interrupt("session").pipe(Effect.forkChild) + yield* Effect.yieldNow + yield* coordinator.wake("session") + yield* Deferred.await(firstInterrupted) + yield* Fiber.join(interrupt) + yield* Deferred.await(secondStarted) + yield* coordinator.awaitIdle("session") + + expect(runs).toBe(2) + }), + ), + ) + + it.effect("interrupts an explicit run queued before the interruption request", () => + Effect.scoped( + Effect.gen(function* () { + const firstStarted = yield* Deferred.make() + let runs = 0 + const coordinator = yield* SessionRunCoordinator.make({ + drain: () => + Effect.sync(() => ++runs).pipe( + Effect.flatMap((run) => + run === 1 + ? Deferred.succeed(firstStarted, undefined).pipe(Effect.andThen(Effect.never)) + : Effect.void, + ), + ), + }) + + yield* coordinator.wake("session") + yield* Deferred.await(firstStarted) + const run = yield* coordinator.run("session").pipe(Effect.forkChild) + yield* Effect.yieldNow + + yield* coordinator.interrupt("session") + const exit = yield* Fiber.await(run) + expect(Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)).toBeTrue() + expect(runs).toBe(1) + }), + ), + ) + + it.effect("settles a pre-interrupt explicit run only after active wake cleanup", () => + Effect.scoped( + Effect.gen(function* () { + const started = yield* Deferred.make() + const cleanupStarted = yield* Deferred.make() + const cleanupGate = yield* Deferred.make() + const runSettled = yield* Deferred.make() + const coordinator = yield* SessionRunCoordinator.make({ + drain: () => + Deferred.succeed(started, undefined).pipe( + Effect.andThen(Effect.never), + Effect.onInterrupt(() => + Deferred.succeed(cleanupStarted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))), + ), + ), + }) + + yield* coordinator.wake("session") + yield* Deferred.await(started) + const run = yield* coordinator + .run("session") + .pipe(Effect.exit, Effect.ensuring(Deferred.succeed(runSettled, undefined)), Effect.forkChild) + const interrupt = yield* coordinator.interrupt("session").pipe(Effect.forkChild) + yield* Deferred.await(cleanupStarted) + + expect(yield* Deferred.isDone(runSettled)).toBeFalse() + yield* Deferred.succeed(cleanupGate, undefined) + const runExit = yield* Fiber.join(run) + expect(Exit.isFailure(runExit) && Cause.hasInterruptsOnly(runExit.cause)).toBeTrue() + yield* Fiber.join(interrupt) + }), + ), + ) + + it.effect("routes an explicit run arriving during interrupt cleanup to the successor", () => + Effect.scoped( + Effect.gen(function* () { + const firstStarted = yield* Deferred.make() + const cleanupStarted = yield* Deferred.make() + const cleanupGate = yield* Deferred.make() + const secondStarted = yield* Deferred.make() + let runs = 0 + const coordinator = yield* SessionRunCoordinator.make({ + drain: () => + Effect.sync(() => ++runs).pipe( + Effect.flatMap((run) => + run === 1 + ? Deferred.succeed(firstStarted, undefined).pipe( + Effect.andThen(Effect.never), + Effect.onInterrupt(() => + Deferred.succeed(cleanupStarted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))), + ), + ) + : Deferred.succeed(secondStarted, undefined), + ), + ), + }) + + yield* coordinator.wake("session") + yield* Deferred.await(firstStarted) + const interrupt = yield* coordinator.interrupt("session").pipe(Effect.forkChild) + yield* Deferred.await(cleanupStarted) + const run = yield* coordinator.run("session").pipe(Effect.forkChild) + yield* Deferred.succeed(cleanupGate, undefined) + yield* Fiber.join(interrupt) + yield* Fiber.join(run) + yield* Deferred.await(secondStarted) + expect(runs).toBe(2) + }), + ), + ) + + it.effect("waits for interrupt cleanup before settling callers", () => + Effect.scoped( + Effect.gen(function* () { + const started = yield* Deferred.make() + const cleanupStarted = yield* Deferred.make() + const cleanupGate = yield* Deferred.make() + const runSettled = yield* Deferred.make() + const idleSettled = yield* Deferred.make() + const interruptSettled = yield* Deferred.make() + const coordinator = yield* SessionRunCoordinator.make({ + drain: () => + Deferred.succeed(started, undefined).pipe( + Effect.andThen(Effect.never), + Effect.onInterrupt(() => + Deferred.succeed(cleanupStarted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))), + ), + ), + }) + + const run = yield* coordinator + .run("session") + .pipe(Effect.ensuring(Deferred.succeed(runSettled, undefined)), Effect.forkChild) + yield* Deferred.await(started) + const idle = yield* coordinator + .awaitIdle("session") + .pipe(Effect.exit, Effect.ensuring(Deferred.succeed(idleSettled, undefined)), Effect.forkChild) + const interrupt = yield* coordinator + .interrupt("session") + .pipe(Effect.ensuring(Deferred.succeed(interruptSettled, undefined)), Effect.forkChild) + yield* Deferred.await(cleanupStarted) + + expect(yield* Deferred.isDone(runSettled)).toBeFalse() + expect(yield* Deferred.isDone(idleSettled)).toBeFalse() + expect(yield* Deferred.isDone(interruptSettled)).toBeFalse() + yield* Deferred.succeed(cleanupGate, undefined) + const runExit = yield* Fiber.await(run) + const idleExit = yield* Fiber.join(idle) + expect(Exit.isFailure(runExit) && Cause.hasInterruptsOnly(runExit.cause)).toBeTrue() + expect(Exit.isFailure(idleExit) && Cause.hasInterruptsOnly(idleExit.cause)).toBeTrue() + yield* Fiber.join(interrupt) + }), + ), + ) + + it.effect("joins concurrent interruption requests for one active drain", () => + Effect.scoped( + Effect.gen(function* () { + const started = yield* Deferred.make() + const cleanupStarted = yield* Deferred.make() + const cleanupGate = yield* Deferred.make() + const coordinator = yield* SessionRunCoordinator.make({ + drain: () => + Deferred.succeed(started, undefined).pipe( + Effect.andThen(Effect.never), + Effect.onInterrupt(() => + Deferred.succeed(cleanupStarted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))), + ), + ), + }) + + yield* coordinator.wake("session") + yield* Deferred.await(started) + const first = yield* coordinator.interrupt("session").pipe(Effect.forkChild) + yield* Deferred.await(cleanupStarted) + const second = yield* coordinator.interrupt("session").pipe(Effect.forkChild) + yield* Deferred.succeed(cleanupGate, undefined) + + yield* Fiber.join(first) + yield* Fiber.join(second) + }), + ), + ) + + it.effect("does not discard a post-interrupt successor when interrupted again", () => + Effect.scoped( + Effect.gen(function* () { + const firstStarted = yield* Deferred.make() + const cleanupStarted = yield* Deferred.make() + const cleanupGate = yield* Deferred.make() + const secondStarted = yield* Deferred.make() + let runs = 0 + const coordinator = yield* SessionRunCoordinator.make({ + drain: () => + Effect.sync(() => ++runs).pipe( + Effect.flatMap((run) => + run === 1 + ? Deferred.succeed(firstStarted, undefined).pipe( + Effect.andThen(Effect.never), + Effect.onInterrupt(() => + Deferred.succeed(cleanupStarted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))), + ), + ) + : Deferred.succeed(secondStarted, undefined), + ), + ), + }) + + yield* coordinator.wake("session") + yield* Deferred.await(firstStarted) + const firstInterrupt = yield* coordinator.interrupt("session").pipe(Effect.forkChild) + yield* Deferred.await(cleanupStarted) + const run = yield* coordinator.run("session").pipe(Effect.forkChild) + const secondInterrupt = yield* coordinator.interrupt("session").pipe(Effect.forkChild) + yield* Deferred.succeed(cleanupGate, undefined) + + yield* Effect.all([Fiber.join(firstInterrupt), Fiber.join(secondInterrupt), Fiber.join(run)]) + yield* Deferred.await(secondStarted) + expect(runs).toBe(2) + }), + ), + ) + it.effect("coalesces wakes received during an active run", () => Effect.scoped( Effect.gen(function* () { @@ -381,4 +704,103 @@ describe("SessionRunCoordinator", () => { }), ), ) + + it.effect("reports an advisory drain failure exactly once", () => + Effect.scoped( + Effect.gen(function* () { + const failure = new Error("wake failed") + const reported: Cause.Cause[] = [] + const reportedOnce = yield* Deferred.make() + const coordinator = yield* SessionRunCoordinator.make({ + drain: () => Effect.fail(failure), + onFailure: (_key, cause) => + Effect.sync(() => reported.push(cause)).pipe(Effect.andThen(Deferred.succeed(reportedOnce, undefined))), + }) + + yield* coordinator.wake("session") + yield* Deferred.await(reportedOnce) + yield* Effect.yieldNow + + expect(reported).toHaveLength(1) + expect(Cause.squash(reported[0]!)).toBe(failure) + }), + ), + ) + + it.effect("contains defects thrown while constructing an advisory failure report", () => + Effect.scoped( + Effect.gen(function* () { + const coordinator = yield* SessionRunCoordinator.make({ + drain: () => Effect.fail(new Error("wake failed")), + onFailure: () => { + throw new Error("report defect") + }, + }) + + yield* coordinator.wake("session") + yield* coordinator.awaitIdle("session").pipe(Effect.exit) + yield* coordinator.wake("session") + yield* coordinator.awaitIdle("session").pipe(Effect.exit) + }), + ), + ) + + it.effect("reports an independently interrupted advisory drain", () => + Effect.scoped( + Effect.gen(function* () { + const reported = yield* Deferred.make>() + const coordinator = yield* SessionRunCoordinator.make({ + drain: () => Effect.interrupt, + onFailure: (_key, cause) => Deferred.succeed(reported, cause).pipe(Effect.asVoid), + }) + + yield* coordinator.wake("session") + + expect(Cause.hasInterruptsOnly(yield* Deferred.await(reported))).toBeTrue() + }), + ), + ) + + it.effect("does not report deliberate interruption as an advisory failure", () => + Effect.scoped( + Effect.gen(function* () { + const started = yield* Deferred.make() + const reported: Cause.Cause[] = [] + const coordinator = yield* SessionRunCoordinator.make({ + drain: () => Deferred.succeed(started, undefined).pipe(Effect.andThen(Effect.never)), + onFailure: (_key, cause) => Effect.sync(() => reported.push(cause)), + }) + + yield* coordinator.wake("session") + yield* Deferred.await(started) + yield* coordinator.interrupt("session") + yield* Effect.yieldNow + + expect(reported).toEqual([]) + }), + ), + ) + + it.effect("trampolines many synchronous self-waking drains", () => + Effect.scoped( + Effect.gen(function* () { + const limit = 20_000 + let runs = 0 + let wake: (key: string) => Effect.Effect = () => Effect.void + const coordinator = yield* SessionRunCoordinator.make({ + drain: (key) => + Effect.sync(() => ++runs).pipe( + Effect.tap((run) => (run < limit ? wake(key) : Effect.void)), + Effect.asVoid, + ), + }) + wake = coordinator.wake + + yield* coordinator.wake("session") + yield* coordinator.awaitIdle("session") + + expect(runs).toBe(limit) + }), + ), + ) }) diff --git a/packages/core/test/session-runner-recorded.test.ts b/packages/core/test/session-runner-recorded.test.ts index 376b2e3452f0..e837c53b7b90 100644 --- a/packages/core/test/session-runner-recorded.test.ts +++ b/packages/core/test/session-runner-recorded.test.ts @@ -20,6 +20,7 @@ import { ToolRegistry } from "@opencode-ai/core/tool/registry" import { SessionTable } from "@opencode-ai/core/session/sql" import { SessionStore } from "@opencode-ai/core/session/store" import { SystemContextRegistry } from "@opencode-ai/core/system-context-registry" +import { Location } from "@opencode-ai/core/location" import { describe, expect } from "bun:test" import { eq } from "drizzle-orm" import { Effect, Layer } from "effect" @@ -57,6 +58,7 @@ const model = OpenAIChat.route .model({ id: "gpt-4o-mini" }) const models = SessionRunnerModel.layerWith(() => Effect.succeed(model)) const systemContext = SystemContextRegistry.layer +const location = Location.layer({ directory: AbsolutePath.make("/project") }).pipe(Layer.provide(Project.defaultLayer)) const runner = SessionRunnerLLM.defaultLayer.pipe( Layer.provide(database), Layer.provide(store), @@ -65,12 +67,15 @@ const runner = SessionRunnerLLM.defaultLayer.pipe( Layer.provide(registry), Layer.provide(models), Layer.provide(systemContext), + Layer.provide(location), ) const coordinator = SessionRunCoordinator.layer.pipe(Layer.provide(runner)) const execution = Layer.effect( SessionExecution.Service, SessionRunCoordinator.Service.pipe( - Effect.map((coordinator) => SessionExecution.Service.of({ resume: coordinator.run, wake: coordinator.wake })), + Effect.map((coordinator) => + SessionExecution.Service.of({ resume: coordinator.run, wake: coordinator.wake, interrupt: coordinator.interrupt }), + ), ), ).pipe(Layer.provide(coordinator)) const sessions = SessionV2.layer.pipe( @@ -92,6 +97,7 @@ const it = testEffect( registry, models, systemContext, + location, runner, coordinator, execution, diff --git a/packages/core/test/session-runner.test.ts b/packages/core/test/session-runner.test.ts index 160884402ed5..6d202cd2e042 100644 --- a/packages/core/test/session-runner.test.ts +++ b/packages/core/test/session-runner.test.ts @@ -42,6 +42,7 @@ import { SessionStore } from "@opencode-ai/core/session/store" import { SystemContext } from "@opencode-ai/core/system-context" import { SystemContextRegistry } from "@opencode-ai/core/system-context-registry" import { ModelV2 } from "@opencode-ai/core/model" +import { Location } from "@opencode-ai/core/location" import { ProviderV2 } from "@opencode-ai/core/provider" import { Cause, DateTime, Deferred, Effect, Exit, Fiber, Layer, Schema, Stream } from "effect" import { asc, eq } from "drizzle-orm" @@ -180,6 +181,7 @@ const systemContext = Layer.effectDiscard( ), ), ).pipe(Layer.provideMerge(SystemContextRegistry.layer)) +const location = Location.layer({ directory: AbsolutePath.make("/project") }).pipe(Layer.provide(Project.defaultLayer)) const runner = SessionRunnerLLM.layer.pipe( Layer.provide(database), Layer.provide(store), @@ -188,12 +190,15 @@ const runner = SessionRunnerLLM.layer.pipe( Layer.provide(registry), Layer.provide(models), Layer.provide(systemContext), + Layer.provide(location), ) const coordinator = SessionRunCoordinator.layer.pipe(Layer.provide(runner)) const execution = Layer.effect( SessionExecution.Service, SessionRunCoordinator.Service.pipe( - Effect.map((coordinator) => SessionExecution.Service.of({ resume: coordinator.run, wake: coordinator.wake })), + Effect.map((coordinator) => + SessionExecution.Service.of({ resume: coordinator.run, wake: coordinator.wake, interrupt: coordinator.interrupt }), + ), ), ).pipe(Layer.provide(coordinator)) const sessions = SessionV2.layer.pipe( @@ -217,6 +222,7 @@ const it = testEffect( echo, models, systemContext, + location, runner, coordinator, execution, @@ -596,7 +602,7 @@ describe("SessionRunnerLLM", () => { }), ) - it.effect("requires a complete new baseline after a Session moves", () => + it.effect("interrupts a source Location runner after a Session moves", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service @@ -620,12 +626,10 @@ describe("SessionRunnerLLM", () => { .get(), ).toBeUndefined() - systemUnavailable = true yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false }) const exit = yield* session.resume(sessionID).pipe(Effect.exit) - expect(Exit.isFailure(exit)).toBe(true) - if (Exit.isFailure(exit)) expect(Cause.squash(exit.cause)).toBeInstanceOf(SystemContext.InitializationBlocked) + expect(Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)).toBe(true) expect(requests).toHaveLength(1) expect(yield* SessionInput.hasPending(db, sessionID, "steer")).toBe(true) }), @@ -1558,6 +1562,48 @@ describe("SessionRunnerLLM", () => { }), ) + it.effect("preserves durable queued input for a later wake after interruption", () => + Effect.gen(function* () { + yield* setup + const session = yield* SessionV2.Service + const coordinator = yield* SessionRunCoordinator.Service + yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Interrupt current work" }), resume: false }) + + requests.length = 0 + responses = [ + [], + [ + LLMEvent.stepStart({ index: 0 }), + LLMEvent.stepFinish({ index: 0, reason: "stop" }), + LLMEvent.finish({ reason: "stop" }), + ], + ] + streamGate = yield* Deferred.make() + streamStarted = yield* Deferred.make() + + const run = yield* session.resume(sessionID).pipe(Effect.forkChild) + yield* Deferred.await(streamStarted) + yield* session.prompt({ + sessionID, + prompt: new Prompt({ text: "Run after interrupt" }), + delivery: "queue", + }) + yield* session.interrupt(sessionID) + expect(yield* Fiber.await(run)).toMatchObject({ _tag: "Failure" }) + expect(requests).toHaveLength(1) + yield* coordinator.wake(sessionID) + while (requests.length < 2) yield* Effect.yieldNow + yield* Deferred.succeed(streamGate, undefined) + yield* coordinator.awaitIdle(sessionID) + streamGate = undefined + streamStarted = undefined + + expect(requests).toHaveLength(2) + expect(userTexts(requests[0]!)).toEqual(["Interrupt current work"]) + expect(userTexts(requests[1]!)).toEqual(["Interrupt current work", "Run after interrupt"]) + }), + ) + it.effect("runs queued active inputs as separate FIFO activities", () => Effect.gen(function* () { yield* setup @@ -2269,13 +2315,13 @@ describe("SessionRunnerLLM", () => { Stream.never, ) - const runner = yield* SessionRunner.Service - const run = yield* runner.run({ sessionID, force: true }).pipe(Effect.forkChild) + const run = yield* session.resume(sessionID).pipe(Effect.forkChild) while (executions.length === 0) yield* Effect.yieldNow - yield* Fiber.interrupt(run) + yield* session.interrupt(sessionID) toolExecutionGate = undefined expect(yield* Fiber.await(run)).toMatchObject({ _tag: "Failure" }) + yield* session.interrupt(sessionID) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Interrupt blocked tool" }, { @@ -2304,6 +2350,29 @@ describe("SessionRunnerLLM", () => { }), ) + it.effect("interrupts a blocked provider turn without local tool activity", () => + Effect.gen(function* () { + yield* setup + const session = yield* SessionV2.Service + yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Interrupt provider" }), resume: false }) + requests.length = 0 + response = [] + streamGate = yield* Deferred.make() + streamStarted = yield* Deferred.make() + + const run = yield* session.resume(sessionID).pipe(Effect.forkChild) + yield* Deferred.await(streamStarted) + yield* session.interrupt(sessionID) + const exit = yield* Fiber.await(run) + streamGate = undefined + streamStarted = undefined + + expect(Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)).toBeTrue() + expect(requests).toHaveLength(1) + yield* session.interrupt(sessionID) + }), + ) + it.effect("durably fails blocked local tools when interrupted while awaiting settlement", () => Effect.gen(function* () { yield* setup diff --git a/specs/v2/session.md b/specs/v2/session.md index 00f7c2ba7265..47eec3cf44b1 100644 --- a/specs/v2/session.md +++ b/specs/v2/session.md @@ -18,6 +18,13 @@ sessions.prompt({ id?, sessionID, prompt, delivery?, resume? }) -> exact retry schedules another wake unless resume is false -> resume omitted or true schedules execution after admission -> resume false admits only + +sessions.interrupt(sessionID) + -> interrupts the active ownership chain on this process + -> waits for active drain cleanup and settlement + -> suppresses reruns already queued before interruption + -> preserves durable inbox rows for a later fresh wake or resume + -> idle or missing Session is a no-op ``` `session_input` is the durable admission inbox. Admitted inputs remain outside model-visible Session history until the serialized runner publishes `PromptLifecycle.Promoted`. The projector atomically writes the visible user message and marks its inbox row promoted in the same event transaction. The legacy V1-to-V2 shadow bridge continues publishing ordinary `Prompted` events for already-visible V1 prompts. @@ -111,7 +118,7 @@ Execution has two entry points: Post-crash activity recovery is intentionally deferred. A wake does not infer that ambiguous provider work is safe to retry after an input has already been promoted. Explicit `run` may deliberately continue from durable projected history. A future recovery slice should model durable activity identity, provider-dispatch ambiguity, required continuation, queue-opener reservation, retry policy, and visible recovery status together. -A location-scoped `SessionRunCoordinator` serializes each Session drain chain while allowing different Sessions to drain concurrently. Automatic startup discovery, durable multi-node ownership, stale-owner fencing, interruption controls, and retry policy remain future work. +A process-global `SessionRunCoordinator` serializes each local Session drain chain while allowing different Sessions to drain concurrently. It enters the Session's current Location only when a drain starts, so interruption targets process execution ownership rather than Location cache identity. Interruption establishes a local ownership-chain boundary by stopping the current chain while preserving pending/unpromoted durable inbox rows for a later fresh wake and projected history for explicit resume. A Location runner also fences every new provider turn against its captured Location so a moved Session cannot begin another turn through source-Location tools or context. An already-dispatched provider turn may still settle source-Location calls until a future move-control slice interrupts active ownership. Automatic startup discovery, durable multi-node ownership, stale-owner fencing, and retry policy remain future work. Inbox promotion coalesces pending steers in durable admission order and opens one queued activity at a time in FIFO order. Add explicit inbox backlog and steering-batch limits before exposing broad multi-caller admission or untrusted queue growth. diff --git a/specs/v2/todo.md b/specs/v2/todo.md index ee18ebbd7bf9..002139cdd5ee 100644 --- a/specs/v2/todo.md +++ b/specs/v2/todo.md @@ -33,7 +33,7 @@ through legacy `SessionPrompt.loop(...)`: Prompt admission now uses a durable `session_input` inbox rather than immediate transcript projection. `steer` inputs coalesce into the active activity at the next safe provider-turn boundary. `queue` inputs form a FIFO of future activities -that open one at a time. A location-scoped `SessionRunCoordinator` coalesces process-local wakeups +that open one at a time. A process-global `SessionRunCoordinator` coalesces process-local wakeups around settlement races. Explicit `run` resumes perform at least one provider attempt; advisory `wake` notifications call the provider only for eligible inbox work. Steers coalesce into the active activity at @@ -55,7 +55,7 @@ Next reviewed slices: - integrate the new BackgroundJob service with V2 tool execution: support background bash jobs and background agent dispatch with durable status observation, completion delivery, and explicit cancellation / continuation semantics -- add compaction, interruption, retries, and stale-owner fencing +- add compaction, durable/clustered interruption, retries, and stale-owner fencing only as their slices become concrete ### Deferred durable activity recovery From 6a8ae0f7b971761becd1ec5e2016f6f9048457b6 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Fri, 5 Jun 2026 00:46:35 -0400 Subject: [PATCH 2/6] fix(core): separate interrupt successor waiters --- packages/core/src/session/run-coordinator.ts | 3 +- .../core/test/session-run-coordinator.test.ts | 42 +++++++++++++++++++ 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/packages/core/src/session/run-coordinator.ts b/packages/core/src/session/run-coordinator.ts index a60010594323..4a2e63be6dbd 100644 --- a/packages/core/src/session/run-coordinator.ts +++ b/packages/core/src/session/run-coordinator.ts @@ -189,8 +189,7 @@ export const make = (options: { if (entry !== undefined) { if (entry.stopping) { entry.rerun = strongest(entry.rerun, "run") - entry.successorExplicit ??= - entry.mode === "wake" ? (entry.explicit ?? Deferred.makeUnsafe()) : Deferred.makeUnsafe() + entry.successorExplicit ??= Deferred.makeUnsafe() return restore(awaitRun(entry.successorExplicit)) } if (entry.mode === "wake") { diff --git a/packages/core/test/session-run-coordinator.test.ts b/packages/core/test/session-run-coordinator.test.ts index 124ecf42906b..3a1e15af3460 100644 --- a/packages/core/test/session-run-coordinator.test.ts +++ b/packages/core/test/session-run-coordinator.test.ts @@ -252,6 +252,48 @@ describe("SessionRunCoordinator", () => { ), ) + it.effect("separates pre-interrupt and post-interrupt explicit waiters", () => + Effect.scoped( + Effect.gen(function* () { + const firstStarted = yield* Deferred.make() + const cleanupStarted = yield* Deferred.make() + const cleanupGate = yield* Deferred.make() + const secondStarted = yield* Deferred.make() + let runs = 0 + const coordinator = yield* SessionRunCoordinator.make({ + drain: () => + Effect.sync(() => ++runs).pipe( + Effect.flatMap((run) => + run === 1 + ? Deferred.succeed(firstStarted, undefined).pipe( + Effect.andThen(Effect.never), + Effect.onInterrupt(() => + Deferred.succeed(cleanupStarted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))), + ), + ) + : Deferred.succeed(secondStarted, undefined), + ), + ), + }) + + yield* coordinator.wake("session") + yield* Deferred.await(firstStarted) + const before = yield* coordinator.run("session").pipe(Effect.exit, Effect.forkChild) + const interrupt = yield* coordinator.interrupt("session").pipe(Effect.forkChild) + yield* Deferred.await(cleanupStarted) + const after = yield* coordinator.run("session").pipe(Effect.exit, Effect.forkChild) + yield* Deferred.succeed(cleanupGate, undefined) + + const beforeExit = yield* Fiber.join(before) + expect(Exit.isFailure(beforeExit) && Cause.hasInterruptsOnly(beforeExit.cause)).toBeTrue() + yield* Fiber.join(interrupt) + yield* Fiber.join(after) + yield* Deferred.await(secondStarted) + expect(runs).toBe(2) + }), + ), + ) + it.effect("waits for interrupt cleanup before settling callers", () => Effect.scoped( Effect.gen(function* () { From beab9487aa2cbeee6ca15a87454d29d0b7146c04 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Fri, 5 Jun 2026 10:21:21 -0400 Subject: [PATCH 3/6] fix(core): make interrupt a stop barrier --- packages/core/src/session/run-coordinator.ts | 15 +++---- .../core/test/session-run-coordinator.test.ts | 20 ++++++--- packages/core/test/session-runner.test.ts | 45 +++++++++++++++++++ 3 files changed, 65 insertions(+), 15 deletions(-) diff --git a/packages/core/src/session/run-coordinator.ts b/packages/core/src/session/run-coordinator.ts index 4a2e63be6dbd..c2fe3f34e7f8 100644 --- a/packages/core/src/session/run-coordinator.ts +++ b/packages/core/src/session/run-coordinator.ts @@ -19,8 +19,8 @@ export type Mode = "run" | "wake" * `wake` reports that durable work may now be available. It starts a chain while idle or * requests one coalesced follow-up while draining. Repeated wakes collapse together. * - * `interrupt` stops the current ownership chain. Wakes and explicit runs arriving after the - * interruption request become a fresh successor; previously queued reruns are suppressed. + * `interrupt` stops the current ownership chain. Advisory wakes received while stopping are + * suppressed; explicit runs wait for interruption cleanup before starting a fresh chain. */ export interface Coordinator { /** Starts or joins one explicit drain generation. */ @@ -29,7 +29,7 @@ export interface Coordinator { readonly wake: (key: Key) => Effect.Effect /** Waits until the current ownership chain settles. */ readonly awaitIdle: (key: Key) => Effect.Effect - /** Interrupts the active ownership chain. Later requests may start a fresh successor. */ + /** Interrupts the active ownership chain without automatically draining pending wakes. */ readonly interrupt: (key: Key) => Effect.Effect } @@ -39,7 +39,6 @@ type Entry = { mode: Mode rerun?: Mode explicit?: Deferred.Deferred - successorExplicit?: Deferred.Deferred owner?: Fiber.Fiber stopping: boolean } @@ -122,8 +121,7 @@ export const make = (options: { return } - const successorExplicit = entry.successorExplicit ?? (mode === "wake" ? entry.explicit : undefined) - const successor = entry.rerun !== undefined ? makeEntry(entry.rerun, successorExplicit) : undefined + const successor = !entry.stopping && entry.rerun !== undefined ? makeEntry(entry.rerun, entry.explicit) : undefined if (successor === undefined) active.delete(key) else active.set(key, successor) if (successor !== undefined) start(key, successor, successor.mode, true) @@ -144,6 +142,7 @@ export const make = (options: { if (closed) return const entry = active.get(key) if (entry !== undefined) { + if (entry.stopping) return entry.rerun = strongest(entry.rerun, "wake") return } @@ -188,9 +187,7 @@ export const make = (options: { const entry = active.get(key) if (entry !== undefined) { if (entry.stopping) { - entry.rerun = strongest(entry.rerun, "run") - entry.successorExplicit ??= Deferred.makeUnsafe() - return restore(awaitRun(entry.successorExplicit)) + return restore(Deferred.await(entry.settled).pipe(Effect.andThen(run(key)))) } if (entry.mode === "wake") { entry.rerun = "run" diff --git a/packages/core/test/session-run-coordinator.test.ts b/packages/core/test/session-run-coordinator.test.ts index 3a1e15af3460..a4ab15c58a8e 100644 --- a/packages/core/test/session-run-coordinator.test.ts +++ b/packages/core/test/session-run-coordinator.test.ts @@ -115,11 +115,12 @@ describe("SessionRunCoordinator", () => { ), ) - it.effect("allows a wake received after interruption to start a successor", () => + it.effect("suppresses a wake received during interruption cleanup", () => Effect.scoped( Effect.gen(function* () { const firstStarted = yield* Deferred.make() const firstInterrupted = yield* Deferred.make() + const cleanupGate = yield* Deferred.make() const secondStarted = yield* Deferred.make() let runs = 0 const coordinator = yield* SessionRunCoordinator.make({ @@ -129,7 +130,9 @@ describe("SessionRunCoordinator", () => { run === 1 ? Deferred.succeed(firstStarted, undefined).pipe( Effect.andThen(Effect.never), - Effect.onInterrupt(() => Deferred.succeed(firstInterrupted, undefined)), + Effect.onInterrupt(() => + Deferred.succeed(firstInterrupted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))), + ), ) : Deferred.succeed(secondStarted, undefined), ), @@ -142,10 +145,15 @@ describe("SessionRunCoordinator", () => { yield* Effect.yieldNow yield* coordinator.wake("session") yield* Deferred.await(firstInterrupted) + expect(runs).toBe(1) + yield* Deferred.succeed(cleanupGate, undefined) yield* Fiber.join(interrupt) - yield* Deferred.await(secondStarted) yield* coordinator.awaitIdle("session") + expect(runs).toBe(1) + yield* coordinator.wake("session") + yield* Deferred.await(secondStarted) + yield* coordinator.awaitIdle("session") expect(runs).toBe(2) }), ), @@ -214,7 +222,7 @@ describe("SessionRunCoordinator", () => { ), ) - it.effect("routes an explicit run arriving during interrupt cleanup to the successor", () => + it.effect("starts an explicit run arriving during interrupt cleanup after the stop barrier", () => Effect.scoped( Effect.gen(function* () { const firstStarted = yield* Deferred.make() @@ -252,7 +260,7 @@ describe("SessionRunCoordinator", () => { ), ) - it.effect("separates pre-interrupt and post-interrupt explicit waiters", () => + it.effect("interrupts pre-stop waiters and runs post-stop waiters after cleanup", () => Effect.scoped( Effect.gen(function* () { const firstStarted = yield* Deferred.make() @@ -367,7 +375,7 @@ describe("SessionRunCoordinator", () => { ), ) - it.effect("does not discard a post-interrupt successor when interrupted again", () => + it.effect("does not discard a post-stop explicit run when interrupted again", () => Effect.scoped( Effect.gen(function* () { const firstStarted = yield* Deferred.make() diff --git a/packages/core/test/session-runner.test.ts b/packages/core/test/session-runner.test.ts index 6d202cd2e042..645ebaff35d7 100644 --- a/packages/core/test/session-runner.test.ts +++ b/packages/core/test/session-runner.test.ts @@ -1567,6 +1567,7 @@ describe("SessionRunnerLLM", () => { yield* setup const session = yield* SessionV2.Service const coordinator = yield* SessionRunCoordinator.Service + const { db } = yield* Database.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Interrupt current work" }), resume: false }) requests.length = 0 @@ -1591,6 +1592,7 @@ describe("SessionRunnerLLM", () => { yield* session.interrupt(sessionID) expect(yield* Fiber.await(run)).toMatchObject({ _tag: "Failure" }) expect(requests).toHaveLength(1) + expect(yield* SessionInput.hasPending(db, sessionID, "queue")).toBe(true) yield* coordinator.wake(sessionID) while (requests.length < 2) yield* Effect.yieldNow yield* Deferred.succeed(streamGate, undefined) @@ -1604,6 +1606,49 @@ describe("SessionRunnerLLM", () => { }), ) + it.effect("preserves durable steering input for a later resume after interruption", () => + Effect.gen(function* () { + yield* setup + const session = yield* SessionV2.Service + const { db } = yield* Database.Service + yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Interrupt current work" }), resume: false }) + + requests.length = 0 + responses = [ + [], + [ + LLMEvent.stepStart({ index: 0 }), + LLMEvent.stepFinish({ index: 0, reason: "stop" }), + LLMEvent.finish({ reason: "stop" }), + ], + ] + streamGate = yield* Deferred.make() + streamStarted = yield* Deferred.make() + + const run = yield* session.resume(sessionID).pipe(Effect.forkChild) + yield* Deferred.await(streamStarted) + yield* session.prompt({ + sessionID, + prompt: new Prompt({ text: "Steer after interrupt" }), + }) + yield* session.interrupt(sessionID) + expect(yield* Fiber.await(run)).toMatchObject({ _tag: "Failure" }) + expect(requests).toHaveLength(1) + expect(yield* SessionInput.hasPending(db, sessionID, "steer")).toBe(true) + + const resumed = yield* session.resume(sessionID).pipe(Effect.forkChild) + while (requests.length < 2) yield* Effect.yieldNow + yield* Deferred.succeed(streamGate, undefined) + yield* Fiber.join(resumed) + streamGate = undefined + streamStarted = undefined + + expect(requests).toHaveLength(2) + expect(userTexts(requests[0]!)).toEqual(["Interrupt current work"]) + expect(userTexts(requests[1]!)).toEqual(["Interrupt current work", "Steer after interrupt"]) + }), + ) + it.effect("runs queued active inputs as separate FIFO activities", () => Effect.gen(function* () { yield* setup From be5b5a0479a1628fa4ed5f521f772177d1baa584 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Fri, 5 Jun 2026 11:40:31 -0400 Subject: [PATCH 4/6] fix(core): persist v2 interrupt boundaries --- packages/core/src/session.ts | 27 ++- packages/core/src/session/event.ts | 8 + packages/core/src/session/execution.ts | 4 +- packages/core/src/session/message-updater.ts | 1 + packages/core/src/session/projector.ts | 1 + packages/core/src/session/run-coordinator.ts | 101 +++++++---- packages/core/test/session-prompt.test.ts | 35 +++- .../core/test/session-run-coordinator.test.ts | 161 +++++++++++++++++- packages/core/test/session-runner.test.ts | 5 +- 9 files changed, 295 insertions(+), 48 deletions(-) diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index e6885891f93c..c6afb2ed5a1c 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -1,7 +1,7 @@ export * as SessionV2 from "./session" export * from "./session/schema" -import { Cause, Effect, Layer, Schema, Context, Stream } from "effect" +import { Cause, DateTime, Effect, Layer, Schema, Context, Stream } from "effect" import { and, asc, desc, eq, gt, like, lt, or, type SQL } from "drizzle-orm" import { ProjectV2 } from "./project" import { WorkspaceV2 } from "./workspace" @@ -172,13 +172,13 @@ export const layer = Layer.effect( const isDurableSessionEvent = Schema.is(SessionEvent.Durable) const scope = yield* Effect.scope - const enqueueWake = (sessionID: SessionSchema.ID) => - execution.wake(sessionID).pipe( + const enqueueWake = (admitted: SessionInput.Admitted) => + execution.wake(admitted.sessionID, admitted.admittedSeq).pipe( Effect.tapCause((cause) => Cause.hasInterruptsOnly(cause) ? Effect.void : Effect.logError("Failed to wake Session").pipe( - Effect.annotateLogs("sessionID", sessionID), + Effect.annotateLogs("sessionID", admitted.sessionID), Effect.annotateLogs("cause", cause), ), ), @@ -352,7 +352,7 @@ export const layer = Layer.effect( Effect.gen(function* () { yield* result.get(input.sessionID) const returnPrompt = Effect.fnUntraced(function* (admitted: SessionInput.Admitted) { - if (input.resume !== false) yield* enqueueWake(input.sessionID) + if (input.resume !== false) yield* enqueueWake(admitted) return admitted }, Effect.uninterruptible) const messageID = input.id ?? SessionMessage.ID.create() @@ -400,9 +400,20 @@ export const layer = Layer.effect( yield* result.get(sessionID) yield* execution.resume(sessionID) }), - interrupt: Effect.fn("V2Session.interrupt")(function* (sessionID) { - yield* execution.interrupt(sessionID) - }), + interrupt: Effect.fn("V2Session.interrupt")((sessionID) => + Effect.uninterruptible( + Effect.gen(function* () { + const session = yield* store.get(sessionID) + if (!session) return yield* execution.interrupt(sessionID) + const event = yield* events.publish(SessionEvent.InterruptRequested, { + sessionID, + timestamp: yield* DateTime.now, + }) + if (event.seq === undefined) return yield* Effect.die("Interrupt request event is missing aggregate sequence") + yield* execution.interrupt(sessionID, event.seq) + }), + ), + ), }) return result diff --git a/packages/core/src/session/event.ts b/packages/core/src/session/event.ts index 048ec6c62462..c7f6f6eb45f0 100644 --- a/packages/core/src/session/event.ts +++ b/packages/core/src/session/event.ts @@ -119,6 +119,13 @@ export namespace PromptLifecycle { export type Promoted = typeof Promoted.Type } +export const InterruptRequested = EventV2.define({ + type: "session.next.interrupt.requested", + ...options, + schema: Base, +}) +export type InterruptRequested = typeof InterruptRequested.Type + export const ContextUpdated = EventV2.define({ type: "session.next.context.updated", ...options, @@ -455,6 +462,7 @@ const DurableDefinitions = [ Prompted, PromptLifecycle.Admitted, PromptLifecycle.Promoted, + InterruptRequested, ContextUpdated, Synthetic, Shell.Started, diff --git a/packages/core/src/session/execution.ts b/packages/core/src/session/execution.ts index af422a9e2e98..9a99145bfb42 100644 --- a/packages/core/src/session/execution.ts +++ b/packages/core/src/session/execution.ts @@ -8,9 +8,9 @@ export interface Interface { /** Explicitly drain one Session, making at least one provider attempt. */ readonly resume: (sessionID: SessionSchema.ID) => Effect.Effect /** Schedule a drain after durable work is recorded. Repeated wakeups may coalesce. */ - readonly wake: (sessionID: SessionSchema.ID) => Effect.Effect + readonly wake: (sessionID: SessionSchema.ID, seq?: number) => Effect.Effect /** Interrupt active work owned by this process. Idle interruption is a no-op. */ - readonly interrupt: (sessionID: SessionSchema.ID) => Effect.Effect + readonly interrupt: (sessionID: SessionSchema.ID, seq?: number) => Effect.Effect } /** Routes execution from a Session ID to the runner owned by that Session's Location. */ diff --git a/packages/core/src/session/message-updater.ts b/packages/core/src/session/message-updater.ts index 38f52c21c4c3..be6cd51877a5 100644 --- a/packages/core/src/session/message-updater.ts +++ b/packages/core/src/session/message-updater.ts @@ -159,6 +159,7 @@ export function update(adapter: Adapter, event: SessionEvent.Event) { }, "session.next.prompt.admitted": () => Effect.void, "session.next.prompt.promoted": () => Effect.void, + "session.next.interrupt.requested": () => Effect.void, "session.next.context.updated": (event) => adapter.appendMessage( new SessionMessage.System({ diff --git a/packages/core/src/session/projector.ts b/packages/core/src/session/projector.ts index 42053ce733d3..dae0f7b1a26e 100644 --- a/packages/core/src/session/projector.ts +++ b/packages/core/src/session/projector.ts @@ -423,6 +423,7 @@ export const layer = Layer.effectDiscard( ) }), ) + yield* events.project(SessionEvent.InterruptRequested, () => Effect.void) yield* events.project(SessionEvent.ContextUpdated, (event) => { if (!event.replay || event.seq === undefined) return run(db, event) return run(db, event).pipe( diff --git a/packages/core/src/session/run-coordinator.ts b/packages/core/src/session/run-coordinator.ts index c2fe3f34e7f8..6c719694d2cf 100644 --- a/packages/core/src/session/run-coordinator.ts +++ b/packages/core/src/session/run-coordinator.ts @@ -19,26 +19,29 @@ export type Mode = "run" | "wake" * `wake` reports that durable work may now be available. It starts a chain while idle or * requests one coalesced follow-up while draining. Repeated wakes collapse together. * - * `interrupt` stops the current ownership chain. Advisory wakes received while stopping are - * suppressed; explicit runs wait for interruption cleanup before starting a fresh chain. + * `interrupt` stops the current ownership chain. Advisory wakes from before the interrupt + * boundary are suppressed; advisory wakes after the boundary run after cleanup. */ export interface Coordinator { /** Starts or joins one explicit drain generation. */ readonly run: (key: Key) => Effect.Effect /** Coalesces one wake-up after durable work is recorded. */ - readonly wake: (key: Key) => Effect.Effect + readonly wake: (key: Key, seq?: number) => Effect.Effect /** Waits until the current ownership chain settles. */ readonly awaitIdle: (key: Key) => Effect.Effect /** Interrupts the active ownership chain without automatically draining pending wakes. */ - readonly interrupt: (key: Key) => Effect.Effect + readonly interrupt: (key: Key, seq?: number) => Effect.Effect } type Entry = { readonly done: Deferred.Deferred readonly settled: Deferred.Deferred> mode: Mode - rerun?: Mode - explicit?: Deferred.Deferred + modeSeq?: number + nextMode?: Mode + nextSeq?: number + explicitWaiter?: Deferred.Deferred + interruptSeq?: number owner?: Fiber.Fiber stopping: boolean } @@ -52,6 +55,7 @@ export const make = (options: { }): Effect.Effect, never, Scope.Scope> => Effect.gen(function* () { const active = new Map>() + const interruptSeq = new Map() const report = yield* FiberSet.makeRuntime() const fork = yield* FiberSet.makeRuntime() const shutdown = Deferred.makeUnsafe() @@ -61,14 +65,16 @@ export const make = (options: { closed = true Deferred.doneUnsafe(shutdown, Effect.void) active.clear() + interruptSeq.clear() }), ) - const makeEntry = (mode: Mode, explicit?: Deferred.Deferred): Entry => ({ + const makeEntry = (mode: Mode, explicitWaiter?: Deferred.Deferred, modeSeq?: number): Entry => ({ done: Deferred.makeUnsafe(), settled: Deferred.makeUnsafe>(), mode, - explicit, + modeSeq, + explicitWaiter, stopping: false, }) @@ -94,13 +100,13 @@ export const make = (options: { Deferred.doneUnsafe(entry.settled, Effect.succeed(exit)) return } - if (mode === "run" && entry.explicit !== undefined) { - Deferred.doneUnsafe(entry.explicit, exit) - entry.explicit = undefined + if (mode === "run" && entry.explicitWaiter !== undefined) { + Deferred.doneUnsafe(entry.explicitWaiter, exit) + entry.explicitWaiter = undefined } - if (entry.stopping && mode === "wake" && entry.explicit !== undefined) { - Deferred.doneUnsafe(entry.explicit, exit) - entry.explicit = undefined + if (entry.stopping && mode === "wake" && entry.explicitWaiter !== undefined) { + Deferred.doneUnsafe(entry.explicitWaiter, exit) + entry.explicitWaiter = undefined } if (active.get(key) !== entry) { Deferred.doneUnsafe(entry.done, exit) @@ -108,10 +114,12 @@ export const make = (options: { return } if (exit._tag === "Success" && !entry.stopping) { - if (entry.rerun !== undefined) { - const mode = entry.rerun - entry.rerun = undefined + if (entry.nextMode !== undefined) { + const mode = entry.nextMode + entry.nextMode = undefined entry.mode = mode + entry.modeSeq = entry.nextSeq + entry.nextSeq = undefined start(key, entry, mode, true) return } @@ -121,7 +129,8 @@ export const make = (options: { return } - const successor = !entry.stopping && entry.rerun !== undefined ? makeEntry(entry.rerun, entry.explicit) : undefined + const successor = + entry.nextMode !== undefined ? makeEntry(entry.nextMode, entry.explicitWaiter, entry.nextSeq) : undefined if (successor === undefined) active.delete(key) else active.set(key, successor) if (successor !== undefined) start(key, successor, successor.mode, true) @@ -137,17 +146,19 @@ export const make = (options: { } } - const wake = (key: Key) => + const wake = (key: Key, seq?: number) => Effect.sync(() => { if (closed) return + if (!isAfterInterrupt(key, seq)) return const entry = active.get(key) if (entry !== undefined) { - if (entry.stopping) return - entry.rerun = strongest(entry.rerun, "wake") + if (!acceptsWake(entry, seq)) return + entry.nextMode = strongest(entry.nextMode, "wake") + entry.nextSeq = maxSeq(entry.nextSeq, seq) return } - const next = makeEntry("wake") + const next = makeEntry("wake", undefined, seq) active.set(key, next) start(key, next, "wake") }) @@ -168,14 +179,24 @@ export const make = (options: { if (firstFailure !== undefined) return yield* Effect.failCause(firstFailure) }) - const interrupt = (key: Key): Effect.Effect => + const interrupt = (key: Key, seq?: number): Effect.Effect => Effect.suspend(() => { const entry = active.get(key) + const latest = interruptSeq.get(key) + if (seq !== undefined && latest !== undefined && seq <= latest) + return entry?.stopping && entry.owner !== undefined ? Fiber.interrupt(entry.owner) : Effect.void + if (seq !== undefined) interruptSeq.set(key, seq) if (entry?.owner === undefined) return Effect.void - if (!entry.stopping) { - entry.stopping = true - entry.rerun = undefined + if (seq !== undefined && entry.mode === "wake" && entry.modeSeq !== undefined && entry.modeSeq > seq) + return Effect.void + if (entry.stopping) { + entry.interruptSeq = maxSeq(entry.interruptSeq, seq) + suppressNextBefore(entry, seq) + return Fiber.interrupt(entry.owner) } + entry.stopping = true + entry.interruptSeq = seq + suppressNextBefore(entry, seq) return Fiber.interrupt(entry.owner) }) @@ -190,9 +211,10 @@ export const make = (options: { return restore(Deferred.await(entry.settled).pipe(Effect.andThen(run(key)))) } if (entry.mode === "wake") { - entry.rerun = "run" - entry.explicit ??= Deferred.makeUnsafe() - return restore(awaitRun(entry.explicit)) + entry.nextMode = "run" + entry.nextSeq = undefined + entry.explicitWaiter ??= Deferred.makeUnsafe() + return restore(awaitRun(entry.explicitWaiter)) } return restore(awaitRun(entry.done)) } @@ -207,6 +229,27 @@ export const make = (options: { function awaitRun(done: Deferred.Deferred): Effect.Effect { return Effect.raceFirst(Deferred.await(done), Deferred.await(shutdown).pipe(Effect.andThen(Effect.interrupt))) } + + function acceptsWake(entry: Entry, seq: number | undefined) { + return !entry.stopping || (entry.interruptSeq !== undefined && seq !== undefined && seq > entry.interruptSeq) + } + + function isAfterInterrupt(key: Key, seq: number | undefined) { + const latest = interruptSeq.get(key) + return latest === undefined || (seq !== undefined && seq > latest) + } + + function maxSeq(left: number | undefined, right: number | undefined) { + if (left === undefined) return right + if (right === undefined) return left + return Math.max(left, right) + } + + function suppressNextBefore(entry: Entry, seq: number | undefined) { + if (entry.nextMode === "wake" && seq !== undefined && entry.nextSeq !== undefined && entry.nextSeq > seq) return + entry.nextMode = undefined + entry.nextSeq = undefined + } }) export interface Interface extends Coordinator {} diff --git a/packages/core/test/session-prompt.test.ts b/packages/core/test/session-prompt.test.ts index 1d46205e1871..b58761ab5b03 100644 --- a/packages/core/test/session-prompt.test.ts +++ b/packages/core/test/session-prompt.test.ts @@ -24,7 +24,9 @@ const projector = SessionProjector.layer.pipe(Layer.provide(events), Layer.provi const store = SessionStore.layer.pipe(Layer.provide(database)) const executionCalls: SessionV2.ID[] = [] const interruptCalls: SessionV2.ID[] = [] +const interruptSeqs: Array = [] const wakeCalls: SessionV2.ID[] = [] +const wakeSeqs: Array = [] const execution = Layer.succeed( SessionExecution.Service, SessionExecution.Service.of({ @@ -32,13 +34,15 @@ const execution = Layer.succeed( Effect.sync(() => { executionCalls.push(sessionID) }), - interrupt: (sessionID) => + interrupt: (sessionID, seq) => Effect.sync(() => { interruptCalls.push(sessionID) + interruptSeqs.push(seq) }), - wake: (sessionID) => + wake: (sessionID, seq) => Effect.sync(() => { wakeCalls.push(sessionID) + wakeSeqs.push(seq) }), }), ) @@ -100,6 +104,15 @@ const eventCount = (type: string) => ), ) +const interruptEvent = Database.Service.use(({ db }) => + db + .select() + .from(EventTable) + .where(eq(EventTable.type, "session.next.interrupt.requested.1")) + .get() + .pipe(Effect.orDie), +) + describe("SessionV2.prompt", () => { it.effect("delegates execution continuation through SessionExecution", () => Effect.gen(function* () { @@ -118,9 +131,15 @@ describe("SessionV2.prompt", () => { yield* setup const session = yield* SessionV2.Service interruptCalls.length = 0 + interruptSeqs.length = 0 yield* session.interrupt(sessionID) expect(interruptCalls).toEqual([sessionID]) + expect(interruptSeqs).toHaveLength(1) + expect(typeof interruptSeqs[0]).toBe("number") + expect(yield* eventCount("session.next.interrupt.requested.1")).toBe(1) + expect(yield* interruptEvent).toMatchObject({ aggregate_id: sessionID, seq: interruptSeqs[0] }) + expect(yield* session.messages({ sessionID })).toEqual([]) }), ) @@ -128,9 +147,11 @@ describe("SessionV2.prompt", () => { Effect.gen(function* () { const session = yield* SessionV2.Service interruptCalls.length = 0 + interruptSeqs.length = 0 yield* session.interrupt(SessionV2.ID.make("ses_missing")) expect(interruptCalls).toEqual([SessionV2.ID.make("ses_missing")]) + expect(interruptSeqs).toEqual([undefined]) }), ) @@ -539,11 +560,13 @@ describe("SessionV2.prompt", () => { const session = yield* SessionV2.Service executionCalls.length = 0 wakeCalls.length = 0 + wakeSeqs.length = 0 - yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run by default" }) }) + const admitted = yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run by default" }) }) expect(executionCalls).toEqual([]) expect(wakeCalls).toEqual([sessionID]) + expect(wakeSeqs).toEqual([admitted.admittedSeq]) }), ) @@ -553,11 +576,13 @@ describe("SessionV2.prompt", () => { const session = yield* SessionV2.Service executionCalls.length = 0 wakeCalls.length = 0 + wakeSeqs.length = 0 - yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run explicitly" }), resume: true }) + const admitted = yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run explicitly" }), resume: true }) expect(executionCalls).toEqual([]) expect(wakeCalls).toEqual([sessionID]) + expect(wakeSeqs).toEqual([admitted.admittedSeq]) }), ) @@ -567,11 +592,13 @@ describe("SessionV2.prompt", () => { const session = yield* SessionV2.Service executionCalls.length = 0 wakeCalls.length = 0 + wakeSeqs.length = 0 yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Do not run" }), resume: false }) expect(executionCalls).toEqual([]) expect(wakeCalls).toEqual([]) + expect(wakeSeqs).toEqual([]) }), ) }) diff --git a/packages/core/test/session-run-coordinator.test.ts b/packages/core/test/session-run-coordinator.test.ts index a4ab15c58a8e..a4adf54643dd 100644 --- a/packages/core/test/session-run-coordinator.test.ts +++ b/packages/core/test/session-run-coordinator.test.ts @@ -51,6 +51,77 @@ describe("SessionRunCoordinator", () => { ), ) + it.effect("suppresses stale wakes after an idle interrupt boundary", () => + Effect.scoped( + Effect.gen(function* () { + let runs = 0 + const coordinator = yield* SessionRunCoordinator.make({ drain: () => Effect.sync(() => runs++) }) + + yield* coordinator.interrupt("session", 2) + yield* coordinator.wake("session", 1) + yield* coordinator.awaitIdle("session") + expect(runs).toBe(0) + + yield* coordinator.wake("session", 3) + yield* coordinator.awaitIdle("session") + expect(runs).toBe(1) + }), + ), + ) + + it.effect("does not interrupt a wake newer than the interrupt boundary", () => + Effect.scoped( + Effect.gen(function* () { + const started = yield* Deferred.make() + const gate = yield* Deferred.make() + const interrupted = yield* Deferred.make() + const coordinator = yield* SessionRunCoordinator.make({ + drain: () => + Deferred.succeed(started, undefined).pipe( + Effect.andThen(Deferred.await(gate)), + Effect.onInterrupt(() => Deferred.succeed(interrupted, undefined)), + ), + }) + + yield* coordinator.wake("session", 3) + yield* Deferred.await(started) + yield* coordinator.interrupt("session", 2) + expect(yield* Deferred.isDone(interrupted)).toBeFalse() + yield* Deferred.succeed(gate, undefined) + yield* coordinator.awaitIdle("session") + }), + ), + ) + + it.effect("preserves a queued wake newer than the interrupt boundary", () => + Effect.scoped( + Effect.gen(function* () { + const firstStarted = yield* Deferred.make() + const secondStarted = yield* Deferred.make() + let runs = 0 + const coordinator = yield* SessionRunCoordinator.make({ + drain: () => + Effect.sync(() => ++runs).pipe( + Effect.flatMap((run) => + run === 1 + ? Deferred.succeed(firstStarted, undefined).pipe(Effect.andThen(Effect.never)) + : Deferred.succeed(secondStarted, undefined), + ), + ), + }) + + yield* coordinator.wake("session", 1) + yield* Deferred.await(firstStarted) + yield* coordinator.wake("session", 3) + yield* coordinator.interrupt("session", 2) + yield* Deferred.await(secondStarted) + yield* coordinator.awaitIdle("session").pipe(Effect.exit) + + expect(runs).toBe(2) + }), + ), + ) + it.effect("interrupts only the requested key", () => Effect.scoped( Effect.gen(function* () { @@ -141,9 +212,9 @@ describe("SessionRunCoordinator", () => { yield* coordinator.wake("session") yield* Deferred.await(firstStarted) - const interrupt = yield* coordinator.interrupt("session").pipe(Effect.forkChild) + const interrupt = yield* coordinator.interrupt("session", 2).pipe(Effect.forkChild) yield* Effect.yieldNow - yield* coordinator.wake("session") + yield* coordinator.wake("session", 1) yield* Deferred.await(firstInterrupted) expect(runs).toBe(1) yield* Deferred.succeed(cleanupGate, undefined) @@ -151,7 +222,93 @@ describe("SessionRunCoordinator", () => { yield* coordinator.awaitIdle("session") expect(runs).toBe(1) + yield* coordinator.wake("session", 3) + yield* Deferred.await(secondStarted) + yield* coordinator.awaitIdle("session") + expect(runs).toBe(2) + }), + ), + ) + + it.effect("remembers a wake received after the interrupt boundary during cleanup", () => + Effect.scoped( + Effect.gen(function* () { + const firstStarted = yield* Deferred.make() + const firstInterrupted = yield* Deferred.make() + const cleanupGate = yield* Deferred.make() + const secondStarted = yield* Deferred.make() + let runs = 0 + const coordinator = yield* SessionRunCoordinator.make({ + drain: () => + Effect.sync(() => ++runs).pipe( + Effect.flatMap((run) => + run === 1 + ? Deferred.succeed(firstStarted, undefined).pipe( + Effect.andThen(Effect.never), + Effect.onInterrupt(() => + Deferred.succeed(firstInterrupted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))), + ), + ) + : Deferred.succeed(secondStarted, undefined), + ), + ), + }) + yield* coordinator.wake("session") + yield* Deferred.await(firstStarted) + const interrupt = yield* coordinator.interrupt("session", 2).pipe(Effect.forkChild) + yield* Deferred.await(firstInterrupted) + yield* coordinator.wake("session", 3) + const staleInterrupt = yield* coordinator.interrupt("session", 1).pipe(Effect.forkChild) + expect(runs).toBe(1) + yield* Deferred.succeed(cleanupGate, undefined) + yield* Fiber.join(interrupt) + yield* Fiber.join(staleInterrupt) + yield* Deferred.await(secondStarted) + yield* coordinator.awaitIdle("session") + + expect(runs).toBe(2) + }), + ), + ) + + it.effect("moves the stop barrier forward for repeated interrupts", () => + Effect.scoped( + Effect.gen(function* () { + const firstStarted = yield* Deferred.make() + const firstInterrupted = yield* Deferred.make() + const cleanupGate = yield* Deferred.make() + const secondStarted = yield* Deferred.make() + let runs = 0 + const coordinator = yield* SessionRunCoordinator.make({ + drain: () => + Effect.sync(() => ++runs).pipe( + Effect.flatMap((run) => + run === 1 + ? Deferred.succeed(firstStarted, undefined).pipe( + Effect.andThen(Effect.never), + Effect.onInterrupt(() => + Deferred.succeed(firstInterrupted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))), + ), + ) + : Deferred.succeed(secondStarted, undefined), + ), + ), + }) + + yield* coordinator.wake("session") + yield* Deferred.await(firstStarted) + const firstInterrupt = yield* coordinator.interrupt("session", 2).pipe(Effect.forkChild) + yield* Deferred.await(firstInterrupted) + yield* coordinator.wake("session", 3) + const secondInterrupt = yield* coordinator.interrupt("session", 4).pipe(Effect.forkChild) + yield* Deferred.succeed(cleanupGate, undefined) + yield* Fiber.join(firstInterrupt) + yield* Fiber.join(secondInterrupt) + yield* coordinator.awaitIdle("session") + expect(runs).toBe(1) + + yield* coordinator.wake("session", 5) yield* Deferred.await(secondStarted) yield* coordinator.awaitIdle("session") expect(runs).toBe(2) diff --git a/packages/core/test/session-runner.test.ts b/packages/core/test/session-runner.test.ts index 645ebaff35d7..63faef0b3ae6 100644 --- a/packages/core/test/session-runner.test.ts +++ b/packages/core/test/session-runner.test.ts @@ -1566,7 +1566,6 @@ describe("SessionRunnerLLM", () => { Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service - const coordinator = yield* SessionRunCoordinator.Service const { db } = yield* Database.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Interrupt current work" }), resume: false }) @@ -1593,10 +1592,10 @@ describe("SessionRunnerLLM", () => { expect(yield* Fiber.await(run)).toMatchObject({ _tag: "Failure" }) expect(requests).toHaveLength(1) expect(yield* SessionInput.hasPending(db, sessionID, "queue")).toBe(true) - yield* coordinator.wake(sessionID) + const resumed = yield* session.resume(sessionID).pipe(Effect.forkChild) while (requests.length < 2) yield* Effect.yieldNow yield* Deferred.succeed(streamGate, undefined) - yield* coordinator.awaitIdle(sessionID) + yield* Fiber.join(resumed) streamGate = undefined streamStarted = undefined From c573a347542904bdb5c466e4714f1889ed3a88e1 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Fri, 5 Jun 2026 11:53:51 -0400 Subject: [PATCH 5/6] refactor(core): model session run demand --- packages/core/src/session/run-coordinator.ts | 91 ++++++++++---------- 1 file changed, 44 insertions(+), 47 deletions(-) diff --git a/packages/core/src/session/run-coordinator.ts b/packages/core/src/session/run-coordinator.ts index 6c719694d2cf..bd2b5fcf34a4 100644 --- a/packages/core/src/session/run-coordinator.ts +++ b/packages/core/src/session/run-coordinator.ts @@ -6,6 +6,8 @@ import { SessionSchema } from "./schema" export type Mode = "run" | "wake" +type Demand = { readonly _tag: "run" } | { readonly _tag: "wake"; readonly seq?: number } + /** * Runs at most one drain chain per key while allowing different keys to drain concurrently. * @@ -36,17 +38,24 @@ export interface Coordinator { type Entry = { readonly done: Deferred.Deferred readonly settled: Deferred.Deferred> - mode: Mode - modeSeq?: number - nextMode?: Mode - nextSeq?: number + current: Demand + pending?: Demand explicitWaiter?: Deferred.Deferred interruptSeq?: number owner?: Fiber.Fiber stopping: boolean } -const strongest = (left: Mode | undefined, right: Mode): Mode => (left === "run" || right === "run" ? "run" : "wake") +const coalesce = (left: Demand | undefined, right: Demand): Demand => { + if (left?._tag === "run" || right._tag === "run") return { _tag: "run" } + return { _tag: "wake", seq: maxSeq(left?.seq, right.seq) } +} + +const maxSeq = (left: number | undefined, right: number | undefined) => { + if (left === undefined) return right + if (right === undefined) return left + return Math.max(left, right) +} /** Constructs a scoped coordinator. Every in-memory transition is synchronous. */ export const make = (options: { @@ -69,23 +78,22 @@ export const make = (options: { }), ) - const makeEntry = (mode: Mode, explicitWaiter?: Deferred.Deferred, modeSeq?: number): Entry => ({ + const makeEntry = (current: Demand, explicitWaiter?: Deferred.Deferred): Entry => ({ done: Deferred.makeUnsafe(), settled: Deferred.makeUnsafe>(), - mode, - modeSeq, + current, explicitWaiter, stopping: false, }) - const start = (key: Key, entry: Entry, mode: Mode, successor = false) => { + const start = (key: Key, entry: Entry, demand: Demand, successor = false) => { const ready = Deferred.makeUnsafe() - const drain = Effect.suspend(() => options.drain(key, mode)) + const drain = Effect.suspend(() => options.drain(key, demand._tag)) // Initial work retains immediate-start behavior but cannot run before ownership is published. // Observer-started successors yield once so synchronous drains cannot recurse on the JS stack. const owner = fork( (successor ? Effect.yieldNow.pipe(Effect.andThen(drain)) : Deferred.await(ready).pipe(Effect.andThen(drain))).pipe( - Effect.onExit((exit) => Effect.sync(() => settle(key, entry, mode, exit))), + Effect.onExit((exit) => Effect.sync(() => settle(key, entry, demand, exit))), Effect.exit, Effect.asVoid, ), @@ -94,17 +102,17 @@ export const make = (options: { if (!successor) Deferred.doneUnsafe(ready, Effect.void) } - const settle = (key: Key, entry: Entry, mode: Mode, exit: Exit.Exit) => { + const settle = (key: Key, entry: Entry, demand: Demand, exit: Exit.Exit) => { if (closed) { Deferred.doneUnsafe(entry.done, exit) Deferred.doneUnsafe(entry.settled, Effect.succeed(exit)) return } - if (mode === "run" && entry.explicitWaiter !== undefined) { + if (demand._tag === "run" && entry.explicitWaiter !== undefined) { Deferred.doneUnsafe(entry.explicitWaiter, exit) entry.explicitWaiter = undefined } - if (entry.stopping && mode === "wake" && entry.explicitWaiter !== undefined) { + if (entry.stopping && demand._tag === "wake" && entry.explicitWaiter !== undefined) { Deferred.doneUnsafe(entry.explicitWaiter, exit) entry.explicitWaiter = undefined } @@ -114,13 +122,11 @@ export const make = (options: { return } if (exit._tag === "Success" && !entry.stopping) { - if (entry.nextMode !== undefined) { - const mode = entry.nextMode - entry.nextMode = undefined - entry.mode = mode - entry.modeSeq = entry.nextSeq - entry.nextSeq = undefined - start(key, entry, mode, true) + if (entry.pending !== undefined) { + const pending = entry.pending + entry.pending = undefined + entry.current = pending + start(key, entry, pending, true) return } active.delete(key) @@ -129,17 +135,16 @@ export const make = (options: { return } - const successor = - entry.nextMode !== undefined ? makeEntry(entry.nextMode, entry.explicitWaiter, entry.nextSeq) : undefined + const successor = entry.pending !== undefined ? makeEntry(entry.pending, entry.explicitWaiter) : undefined if (successor === undefined) active.delete(key) else active.set(key, successor) - if (successor !== undefined) start(key, successor, successor.mode, true) + if (successor !== undefined) start(key, successor, successor.current, true) Deferred.doneUnsafe(entry.done, exit) Deferred.doneUnsafe(entry.settled, Effect.succeed(exit)) if ( exit._tag === "Failure" && !(entry.stopping && Cause.hasInterruptsOnly(exit.cause)) && - mode === "wake" && + demand._tag === "wake" && options.onFailure !== undefined ) { report(Effect.suspend(() => options.onFailure!(key, exit.cause))) @@ -153,14 +158,13 @@ export const make = (options: { const entry = active.get(key) if (entry !== undefined) { if (!acceptsWake(entry, seq)) return - entry.nextMode = strongest(entry.nextMode, "wake") - entry.nextSeq = maxSeq(entry.nextSeq, seq) + entry.pending = coalesce(entry.pending, { _tag: "wake", seq }) return } - const next = makeEntry("wake", undefined, seq) + const next = makeEntry({ _tag: "wake", seq }) active.set(key, next) - start(key, next, "wake") + start(key, next, next.current) }) const awaitIdle = (key: Key): Effect.Effect => @@ -187,16 +191,16 @@ export const make = (options: { return entry?.stopping && entry.owner !== undefined ? Fiber.interrupt(entry.owner) : Effect.void if (seq !== undefined) interruptSeq.set(key, seq) if (entry?.owner === undefined) return Effect.void - if (seq !== undefined && entry.mode === "wake" && entry.modeSeq !== undefined && entry.modeSeq > seq) + if (seq !== undefined && entry.current._tag === "wake" && entry.current.seq !== undefined && entry.current.seq > seq) return Effect.void if (entry.stopping) { entry.interruptSeq = maxSeq(entry.interruptSeq, seq) - suppressNextBefore(entry, seq) + suppressPendingAtOrBefore(entry, seq) return Fiber.interrupt(entry.owner) } entry.stopping = true entry.interruptSeq = seq - suppressNextBefore(entry, seq) + suppressPendingAtOrBefore(entry, seq) return Fiber.interrupt(entry.owner) }) @@ -210,18 +214,17 @@ export const make = (options: { if (entry.stopping) { return restore(Deferred.await(entry.settled).pipe(Effect.andThen(run(key)))) } - if (entry.mode === "wake") { - entry.nextMode = "run" - entry.nextSeq = undefined + if (entry.current._tag === "wake") { + entry.pending = coalesce(entry.pending, { _tag: "run" }) entry.explicitWaiter ??= Deferred.makeUnsafe() return restore(awaitRun(entry.explicitWaiter)) } return restore(awaitRun(entry.done)) } - const next = makeEntry("run") + const next = makeEntry({ _tag: "run" }) active.set(key, next) - start(key, next, "run") + start(key, next, next.current) return restore(awaitRun(next.done)) }) } @@ -239,16 +242,10 @@ export const make = (options: { return latest === undefined || (seq !== undefined && seq > latest) } - function maxSeq(left: number | undefined, right: number | undefined) { - if (left === undefined) return right - if (right === undefined) return left - return Math.max(left, right) - } - - function suppressNextBefore(entry: Entry, seq: number | undefined) { - if (entry.nextMode === "wake" && seq !== undefined && entry.nextSeq !== undefined && entry.nextSeq > seq) return - entry.nextMode = undefined - entry.nextSeq = undefined + function suppressPendingAtOrBefore(entry: Entry, seq: number | undefined) { + if (entry.pending?._tag === "wake" && seq !== undefined && entry.pending.seq !== undefined && entry.pending.seq > seq) + return + entry.pending = undefined } }) From 09e46b05fba2b0eb4ab7b9853bc3c45aca006f4a Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Fri, 5 Jun 2026 11:54:41 -0400 Subject: [PATCH 6/6] docs(core): explain session run demand --- packages/core/src/session/run-coordinator.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/core/src/session/run-coordinator.ts b/packages/core/src/session/run-coordinator.ts index bd2b5fcf34a4..ecf47ac009f0 100644 --- a/packages/core/src/session/run-coordinator.ts +++ b/packages/core/src/session/run-coordinator.ts @@ -6,6 +6,7 @@ import { SessionSchema } from "./schema" export type Mode = "run" | "wake" +/** Why one drain generation should run. Explicit runs dominate advisory wakes when demands coalesce. */ type Demand = { readonly _tag: "run" } | { readonly _tag: "wake"; readonly seq?: number } /** @@ -35,6 +36,7 @@ export interface Coordinator { readonly interrupt: (key: Key, seq?: number) => Effect.Effect } +/** One Session's process-local execution lane: one active demand and at most one coalesced follow-up. */ type Entry = { readonly done: Deferred.Deferred readonly settled: Deferred.Deferred> @@ -46,6 +48,7 @@ type Entry = { stopping: boolean } +/** Combines follow-up demand: runs dominate, while wakes retain the newest durable admission sequence. */ const coalesce = (left: Demand | undefined, right: Demand): Demand => { if (left?._tag === "run" || right._tag === "run") return { _tag: "run" } return { _tag: "wake", seq: maxSeq(left?.seq, right.seq) }