Skip to content

Commit 3938930

Browse files
juliusmarmingecodex
andcommitted
perf(server): bootstrap engine from persisted projections
Co-authored-by: codex <codex@users.noreply.github.com>
1 parent f23d141 commit 3938930

File tree

6 files changed

+117
-8
lines changed

6 files changed

+117
-8
lines changed

apps/server/src/orchestration/Layers/CheckpointReactor.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { GitCoreLive } from "../../git/Layers/GitCore.ts";
2323
import { CheckpointReactorLive } from "./CheckpointReactor.ts";
2424
import { OrchestrationEngineLive } from "./OrchestrationEngine.ts";
2525
import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts";
26+
import { OrchestrationProjectionSnapshotQueryLive } from "./ProjectionSnapshotQuery.ts";
2627
import { RuntimeReceiptBusLive } from "./RuntimeReceiptBus.ts";
2728
import { OrchestrationEventStoreLive } from "../../persistence/Layers/OrchestrationEventStore.ts";
2829
import { OrchestrationCommandReceiptRepositoryLive } from "../../persistence/Layers/OrchestrationCommandReceipts.ts";
@@ -249,6 +250,7 @@ describe("CheckpointReactor", () => {
249250
options?.providerName ?? "codex",
250251
);
251252
const orchestrationLayer = OrchestrationEngineLive.pipe(
253+
Layer.provide(OrchestrationProjectionSnapshotQueryLive),
252254
Layer.provide(OrchestrationProjectionPipelineLive),
253255
Layer.provide(OrchestrationEventStoreLive),
254256
Layer.provide(OrchestrationCommandReceiptRepositoryLive),

apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import {
88
TurnId,
99
type OrchestrationEvent,
1010
} from "@t3tools/contracts";
11-
import { Effect, Layer, ManagedRuntime, Queue, Stream } from "effect";
11+
import { Effect, Layer, ManagedRuntime, Option, Queue, Stream } from "effect";
1212
import { describe, expect, it } from "vitest";
1313

1414
import { PersistenceSqlError } from "../../persistence/Errors.ts";
@@ -21,11 +21,13 @@ import {
2121
} from "../../persistence/Services/OrchestrationEventStore.ts";
2222
import { OrchestrationEngineLive } from "./OrchestrationEngine.ts";
2323
import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts";
24+
import { OrchestrationProjectionSnapshotQueryLive } from "./ProjectionSnapshotQuery.ts";
2425
import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts";
2526
import {
2627
OrchestrationProjectionPipeline,
2728
type OrchestrationProjectionPipelineShape,
2829
} from "../Services/ProjectionPipeline.ts";
30+
import { ProjectionSnapshotQuery } from "../Services/ProjectionSnapshotQuery.ts";
2931
import { ServerConfig } from "../../config.ts";
3032
import * as NodeServices from "@effect/platform-node/NodeServices";
3133

@@ -39,6 +41,7 @@ async function createOrchestrationSystem() {
3941
prefix: "t3-orchestration-engine-test-",
4042
});
4143
const orchestrationLayer = OrchestrationEngineLive.pipe(
44+
Layer.provide(OrchestrationProjectionSnapshotQueryLive),
4245
Layer.provide(OrchestrationProjectionPipelineLive),
4346
Layer.provide(OrchestrationEventStoreLive),
4447
Layer.provide(OrchestrationCommandReceiptRepositoryLive),
@@ -60,6 +63,105 @@ function now() {
6063
}
6164

6265
describe("OrchestrationEngine", () => {
66+
it("bootstraps the in-memory read model from persisted projections", async () => {
67+
const failOnHistoricalReplayStore: OrchestrationEventStoreShape = {
68+
append: () =>
69+
Effect.fail(
70+
new PersistenceSqlError({
71+
operation: "test.append",
72+
detail: "append should not be called during bootstrap",
73+
}),
74+
),
75+
readFromSequence: () => Stream.empty,
76+
readAll: () =>
77+
Stream.fail(
78+
new PersistenceSqlError({
79+
operation: "test.readAll",
80+
detail: "historical replay should not be used during bootstrap",
81+
}),
82+
),
83+
};
84+
85+
const projectionSnapshot = {
86+
snapshotSequence: 7,
87+
updatedAt: "2026-03-03T00:00:04.000Z",
88+
projects: [
89+
{
90+
id: asProjectId("project-bootstrap"),
91+
title: "Bootstrap Project",
92+
workspaceRoot: "/tmp/project-bootstrap",
93+
defaultModelSelection: {
94+
provider: "codex" as const,
95+
model: "gpt-5-codex",
96+
},
97+
scripts: [],
98+
createdAt: "2026-03-03T00:00:00.000Z",
99+
updatedAt: "2026-03-03T00:00:01.000Z",
100+
deletedAt: null,
101+
},
102+
],
103+
threads: [
104+
{
105+
id: ThreadId.makeUnsafe("thread-bootstrap"),
106+
projectId: asProjectId("project-bootstrap"),
107+
title: "Bootstrap Thread",
108+
modelSelection: {
109+
provider: "codex" as const,
110+
model: "gpt-5-codex",
111+
},
112+
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
113+
runtimeMode: "full-access" as const,
114+
branch: null,
115+
worktreePath: null,
116+
latestTurn: null,
117+
createdAt: "2026-03-03T00:00:02.000Z",
118+
updatedAt: "2026-03-03T00:00:03.000Z",
119+
archivedAt: null,
120+
deletedAt: null,
121+
messages: [],
122+
proposedPlans: [],
123+
activities: [],
124+
checkpoints: [],
125+
session: null,
126+
},
127+
],
128+
};
129+
130+
const layer = OrchestrationEngineLive.pipe(
131+
Layer.provide(
132+
Layer.succeed(ProjectionSnapshotQuery, {
133+
getSnapshot: () => Effect.succeed(projectionSnapshot),
134+
getCounts: () => Effect.succeed({ projectCount: 1, threadCount: 1 }),
135+
getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()),
136+
getFirstActiveThreadIdByProjectId: () => Effect.succeed(Option.none()),
137+
getThreadCheckpointContext: () => Effect.succeed(Option.none()),
138+
}),
139+
),
140+
Layer.provide(
141+
Layer.succeed(OrchestrationProjectionPipeline, {
142+
bootstrap: Effect.void,
143+
projectEvent: () => Effect.void,
144+
} satisfies OrchestrationProjectionPipelineShape),
145+
),
146+
Layer.provide(Layer.succeed(OrchestrationEventStore, failOnHistoricalReplayStore)),
147+
Layer.provide(OrchestrationCommandReceiptRepositoryLive),
148+
Layer.provide(SqlitePersistenceMemory),
149+
);
150+
151+
const runtime = ManagedRuntime.make(layer);
152+
153+
const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService));
154+
const readModel = await runtime.runPromise(engine.getReadModel());
155+
156+
expect(readModel.snapshotSequence).toBe(7);
157+
expect(readModel.projects).toHaveLength(1);
158+
expect(readModel.projects[0]?.title).toBe("Bootstrap Project");
159+
expect(readModel.threads).toHaveLength(1);
160+
expect(readModel.threads[0]?.title).toBe("Bootstrap Thread");
161+
162+
await runtime.dispose();
163+
});
164+
63165
it("returns deterministic read models for repeated reads", async () => {
64166
const createdAt = now();
65167
const system = await createOrchestrationSystem();
@@ -417,6 +519,7 @@ describe("OrchestrationEngine", () => {
417519

418520
const runtime = ManagedRuntime.make(
419521
OrchestrationEngineLive.pipe(
522+
Layer.provide(OrchestrationProjectionSnapshotQueryLive),
420523
Layer.provide(OrchestrationProjectionPipelineLive),
421524
Layer.provide(Layer.succeed(OrchestrationEventStore, flakyStore)),
422525
Layer.provide(OrchestrationCommandReceiptRepositoryLive),
@@ -512,6 +615,7 @@ describe("OrchestrationEngine", () => {
512615

513616
const runtime = ManagedRuntime.make(
514617
OrchestrationEngineLive.pipe(
618+
Layer.provide(OrchestrationProjectionSnapshotQueryLive),
515619
Layer.provide(Layer.succeed(OrchestrationProjectionPipeline, flakyProjectionPipeline)),
516620
Layer.provide(OrchestrationEventStoreLive),
517621
Layer.provide(OrchestrationCommandReceiptRepositoryLive),
@@ -653,6 +757,7 @@ describe("OrchestrationEngine", () => {
653757

654758
const runtime = ManagedRuntime.make(
655759
OrchestrationEngineLive.pipe(
760+
Layer.provide(OrchestrationProjectionSnapshotQueryLive),
656761
Layer.provide(Layer.succeed(OrchestrationProjectionPipeline, flakyProjectionPipeline)),
657762
Layer.provide(Layer.succeed(OrchestrationEventStore, nonTransactionalStore)),
658763
Layer.provide(OrchestrationCommandReceiptRepositoryLive),

apps/server/src/orchestration/Layers/OrchestrationEngine.ts

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import {
1919
import { decideOrchestrationCommand } from "../decider.ts";
2020
import { createEmptyReadModel, projectEvent } from "../projector.ts";
2121
import { OrchestrationProjectionPipeline } from "../Services/ProjectionPipeline.ts";
22+
import { ProjectionSnapshotQuery } from "../Services/ProjectionSnapshotQuery.ts";
2223
import {
2324
OrchestrationEngineService,
2425
type OrchestrationEngineShape,
@@ -54,6 +55,7 @@ const makeOrchestrationEngine = Effect.gen(function* () {
5455
const eventStore = yield* OrchestrationEventStore;
5556
const commandReceiptRepository = yield* OrchestrationCommandReceiptRepository;
5657
const projectionPipeline = yield* OrchestrationProjectionPipeline;
58+
const projectionSnapshotQuery = yield* ProjectionSnapshotQuery;
5759

5860
let readModel = createEmptyReadModel(new Date().toISOString());
5961

@@ -195,13 +197,7 @@ const makeOrchestrationEngine = Effect.gen(function* () {
195197
};
196198

197199
yield* projectionPipeline.bootstrap;
198-
199-
// bootstrap in-memory read model from event store
200-
yield* Stream.runForEach(eventStore.readAll(), (event) =>
201-
Effect.gen(function* () {
202-
readModel = yield* projectEvent(readModel, event);
203-
}),
204-
);
200+
readModel = yield* projectionSnapshotQuery.getSnapshot();
205201

206202
const worker = Effect.forever(Queue.take(commandQueue).pipe(Effect.flatMap(processEnvelope)));
207203
yield* Effect.forkScoped(worker);

apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import {
2525
ORCHESTRATION_PROJECTOR_NAMES,
2626
OrchestrationProjectionPipelineLive,
2727
} from "./ProjectionPipeline.ts";
28+
import { OrchestrationProjectionSnapshotQueryLive } from "./ProjectionSnapshotQuery.ts";
2829
import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts";
2930
import { OrchestrationProjectionPipeline } from "../Services/ProjectionPipeline.ts";
3031
import { ServerConfig } from "../../config.ts";
@@ -1841,6 +1842,7 @@ it.effect("restores pending turn-start metadata across projection pipeline resta
18411842

18421843
const engineLayer = it.layer(
18431844
OrchestrationEngineLive.pipe(
1845+
Layer.provide(OrchestrationProjectionSnapshotQueryLive),
18441846
Layer.provide(OrchestrationProjectionPipelineLive),
18451847
Layer.provide(OrchestrationEventStoreLive),
18461848
Layer.provide(OrchestrationCommandReceiptRepositoryLive),

apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import { GitCore, type GitCoreShape } from "../../git/Services/GitCore.ts";
3030
import { TextGeneration, type TextGenerationShape } from "../../git/Services/TextGeneration.ts";
3131
import { OrchestrationEngineLive } from "./OrchestrationEngine.ts";
3232
import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts";
33+
import { OrchestrationProjectionSnapshotQueryLive } from "./ProjectionSnapshotQuery.ts";
3334
import { ProviderCommandReactorLive } from "./ProviderCommandReactor.ts";
3435
import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts";
3536
import { ProviderCommandReactor } from "../Services/ProviderCommandReactor.ts";
@@ -211,6 +212,7 @@ describe("ProviderCommandReactor", () => {
211212
};
212213

213214
const orchestrationLayer = OrchestrationEngineLive.pipe(
215+
Layer.provide(OrchestrationProjectionSnapshotQueryLive),
214216
Layer.provide(OrchestrationProjectionPipelineLive),
215217
Layer.provide(OrchestrationEventStoreLive),
216218
Layer.provide(OrchestrationCommandReceiptRepositoryLive),

apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import {
3131
} from "../../provider/Services/ProviderService.ts";
3232
import { OrchestrationEngineLive } from "./OrchestrationEngine.ts";
3333
import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts";
34+
import { OrchestrationProjectionSnapshotQueryLive } from "./ProjectionSnapshotQuery.ts";
3435
import { ProviderRuntimeIngestionLive } from "./ProviderRuntimeIngestion.ts";
3536
import {
3637
OrchestrationEngineService,
@@ -197,6 +198,7 @@ describe("ProviderRuntimeIngestion", () => {
197198
fs.mkdirSync(path.join(workspaceRoot, ".git"));
198199
const provider = createProviderServiceHarness();
199200
const orchestrationLayer = OrchestrationEngineLive.pipe(
201+
Layer.provide(OrchestrationProjectionSnapshotQueryLive),
200202
Layer.provide(OrchestrationProjectionPipelineLive),
201203
Layer.provide(OrchestrationEventStoreLive),
202204
Layer.provide(OrchestrationCommandReceiptRepositoryLive),

0 commit comments

Comments
 (0)