Skip to content

Commit 1cdfb50

Browse files
Apply PR #18173: feat(bus): migrate Bus to Effect service with PubSub
2 parents a1024be + 8a3aa94 commit 1cdfb50

File tree

12 files changed

+570
-133
lines changed

12 files changed

+570
-133
lines changed

packages/opencode/src/bus/bus-event.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import z from "zod"
2-
import type { ZodType } from "zod"
2+
import type { ZodObject, ZodRawShape } from "zod"
33
import { Log } from "../util/log"
44

55
export namespace BusEvent {
@@ -9,7 +9,7 @@ export namespace BusEvent {
99

1010
const registry = new Map<string, Definition>()
1111

12-
export function define<Type extends string, Properties extends ZodType>(type: Type, properties: Properties) {
12+
export function define<Type extends string, Properties extends ZodObject<ZodRawShape>>(type: Type, properties: Properties) {
1313
const result = {
1414
type,
1515
properties,

packages/opencode/src/bus/global.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ export const GlobalBus = new EventEmitter<{
44
event: [
55
{
66
directory?: string
7-
payload: any
7+
payload: { type: string; properties: Record<string, unknown> }
88
},
99
]
1010
}>()

packages/opencode/src/bus/index.ts

Lines changed: 110 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import z from "zod"
2+
import { Effect, Layer, PubSub, ServiceMap, Stream } from "effect"
23
import { Log } from "../util/log"
34
import { Instance } from "../project/instance"
45
import { BusEvent } from "./bus-event"
56
import { GlobalBus } from "./global"
7+
import { runCallbackInstance, runPromiseInstance } from "../effect/runtime"
68

79
export namespace Bus {
810
const log = Log.create({ service: "bus" })
9-
type Subscription = (event: any) => void
1011

1112
export const InstanceDisposed = BusEvent.define(
1213
"server.instance.disposed",
@@ -15,91 +16,130 @@ export namespace Bus {
1516
}),
1617
)
1718

18-
const state = Instance.state(
19-
() => {
20-
const subscriptions = new Map<any, Subscription[]>()
19+
// ---------------------------------------------------------------------------
20+
// Service definition
21+
// ---------------------------------------------------------------------------
2122

22-
return {
23-
subscriptions,
23+
type Payload<D extends BusEvent.Definition = BusEvent.Definition> = {
24+
type: D["type"]
25+
properties: z.infer<D["properties"]>
26+
}
27+
28+
export interface Interface {
29+
readonly publish: <D extends BusEvent.Definition>(
30+
def: D,
31+
properties: z.output<D["properties"]>,
32+
) => Effect.Effect<void>
33+
readonly subscribe: <D extends BusEvent.Definition>(def: D) => Stream.Stream<Payload<D>>
34+
readonly subscribeAll: () => Stream.Stream<Payload>
35+
}
36+
37+
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Bus") {}
38+
39+
export const layer = Layer.effect(
40+
Service,
41+
Effect.gen(function* () {
42+
const pubsubs = new Map<string, PubSub.PubSub<Payload>>()
43+
const wildcardPubSub = yield* PubSub.unbounded<Payload>()
44+
45+
const getOrCreate = Effect.fnUntraced(function* (type: string) {
46+
let ps = pubsubs.get(type)
47+
if (!ps) {
48+
ps = yield* PubSub.unbounded<Payload>()
49+
pubsubs.set(type, ps)
50+
}
51+
return ps
52+
})
53+
54+
function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
55+
return Effect.gen(function* () {
56+
const payload: Payload = { type: def.type, properties }
57+
log.info("publishing", { type: def.type })
58+
59+
const ps = pubsubs.get(def.type)
60+
if (ps) yield* PubSub.publish(ps, payload)
61+
yield* PubSub.publish(wildcardPubSub, payload)
62+
63+
GlobalBus.emit("event", {
64+
directory: Instance.directory,
65+
payload,
66+
})
67+
})
2468
}
25-
},
26-
async (entry) => {
27-
const wildcard = entry.subscriptions.get("*")
28-
if (!wildcard) return
29-
const event = {
30-
type: InstanceDisposed.type,
31-
properties: {
32-
directory: Instance.directory,
33-
},
69+
70+
function subscribe<D extends BusEvent.Definition>(def: D): Stream.Stream<Payload<D>> {
71+
log.info("subscribing", { type: def.type })
72+
return Stream.unwrap(
73+
Effect.gen(function* () {
74+
const ps = yield* getOrCreate(def.type)
75+
return Stream.fromPubSub(ps) as Stream.Stream<Payload<D>>
76+
}),
77+
).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type }))))
3478
}
35-
for (const sub of [...wildcard]) {
36-
sub(event)
79+
80+
function subscribeAll(): Stream.Stream<Payload> {
81+
log.info("subscribing", { type: "*" })
82+
return Stream.fromPubSub(wildcardPubSub).pipe(
83+
Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" }))),
84+
)
3785
}
38-
},
86+
87+
// Shut down all PubSubs when the layer is torn down.
88+
// This causes Stream.fromPubSub consumers to end, triggering
89+
// their ensuring/finalizers.
90+
yield* Effect.addFinalizer(() =>
91+
Effect.gen(function* () {
92+
log.info("shutting down PubSubs")
93+
yield* PubSub.shutdown(wildcardPubSub)
94+
for (const ps of pubsubs.values()) {
95+
yield* PubSub.shutdown(ps)
96+
}
97+
}),
98+
)
99+
100+
return Service.of({ publish, subscribe, subscribeAll })
101+
}),
39102
)
40103

41-
export async function publish<Definition extends BusEvent.Definition>(
42-
def: Definition,
43-
properties: z.output<Definition["properties"]>,
44-
) {
45-
const payload = {
46-
type: def.type,
47-
properties,
48-
}
49-
log.info("publishing", {
50-
type: def.type,
51-
})
52-
const pending = []
53-
for (const key of [def.type, "*"]) {
54-
const match = [...(state().subscriptions.get(key) ?? [])]
55-
for (const sub of match) {
56-
pending.push(sub(payload))
57-
}
58-
}
59-
GlobalBus.emit("event", {
60-
directory: Instance.directory,
61-
payload,
62-
})
63-
return Promise.all(pending)
104+
// ---------------------------------------------------------------------------
105+
// Legacy adapters — plain function API wrapping the Effect service
106+
// ---------------------------------------------------------------------------
107+
108+
function runStream(stream: (svc: Interface) => Stream.Stream<Payload>, callback: (event: any) => void) {
109+
return runCallbackInstance(
110+
Service.use((svc) => stream(svc).pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg))))),
111+
)
64112
}
65113

66-
export function subscribe<Definition extends BusEvent.Definition>(
67-
def: Definition,
68-
callback: (event: { type: Definition["type"]; properties: z.infer<Definition["properties"]> }) => void,
69-
) {
70-
return raw(def.type, callback)
114+
export function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
115+
return runPromiseInstance(Service.use((svc) => svc.publish(def, properties)))
71116
}
72117

73-
export function once<Definition extends BusEvent.Definition>(
74-
def: Definition,
75-
callback: (event: {
76-
type: Definition["type"]
77-
properties: z.infer<Definition["properties"]>
78-
}) => "done" | undefined,
79-
) {
80-
const unsub = subscribe(def, (event) => {
81-
if (callback(event)) unsub()
82-
})
118+
export function subscribe<D extends BusEvent.Definition>(def: D, callback: (event: Payload<D>) => void) {
119+
return runStream((svc) => svc.subscribe(def), callback)
83120
}
84121

85122
export function subscribeAll(callback: (event: any) => void) {
86-
return raw("*", callback)
87-
}
123+
const directory = Instance.directory
88124

89-
function raw(type: string, callback: (event: any) => void) {
90-
log.info("subscribing", { type })
91-
const subscriptions = state().subscriptions
92-
let match = subscriptions.get(type) ?? []
93-
match.push(callback)
94-
subscriptions.set(type, match)
125+
// InstanceDisposed is delivered via GlobalBus because the legacy
126+
// adapter's fiber starts asynchronously and may not be running when
127+
// disposal happens. In the Effect-native path, forkScoped + scope
128+
// closure handles this correctly. This bridge can be removed once
129+
// upstream PubSub.shutdown properly wakes suspended subscribers:
130+
// https://github.com/Effect-TS/effect-smol/pull/1800
131+
const onDispose = (evt: { directory?: string; payload: any }) => {
132+
if (evt.payload.type !== InstanceDisposed.type) return
133+
if (evt.directory !== directory) return
134+
callback(evt.payload)
135+
GlobalBus.off("event", onDispose)
136+
}
137+
GlobalBus.on("event", onDispose)
95138

139+
const interrupt = runStream((svc) => svc.subscribeAll(), callback)
96140
return () => {
97-
log.info("unsubscribing", { type })
98-
const match = subscriptions.get(type)
99-
if (!match) return
100-
const index = match.indexOf(callback)
101-
if (index === -1) return
102-
match.splice(index, 1)
141+
GlobalBus.off("event", onDispose)
142+
interrupt()
103143
}
104144
}
105145
}

packages/opencode/src/control-plane/workspace.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ export namespace Workspace {
124124
await parseSSE(res.body, stop, (event) => {
125125
GlobalBus.emit("event", {
126126
directory: space.id,
127-
payload: event,
127+
payload: event as { type: string; properties: Record<string, unknown> },
128128
})
129129
})
130130
// Wait 250ms and retry if SSE connection fails

packages/opencode/src/effect/instances.ts

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import { Effect, Layer, LayerMap, ServiceMap } from "effect"
1+
import { Effect, Exit, Fiber, Layer, LayerMap, MutableHashMap, Scope, ServiceMap } from "effect"
2+
import { Bus } from "@/bus"
23
import { File } from "@/file"
34
import { FileTime } from "@/file/time"
45
import { FileWatcher } from "@/file/watcher"
@@ -16,6 +17,7 @@ import { registerDisposer } from "./instance-registry"
1617
export { InstanceContext } from "./instance-context"
1718

1819
export type InstanceServices =
20+
| Bus.Service
1921
| Question.Service
2022
| PermissionNext.Service
2123
| ProviderAuth.Service
@@ -36,6 +38,7 @@ export type InstanceServices =
3638
function lookup(_key: string) {
3739
const ctx = Layer.sync(InstanceContext, () => InstanceContext.of(Instance.current))
3840
return Layer.mergeAll(
41+
Layer.fresh(Bus.layer),
3942
Layer.fresh(Question.layer),
4043
Layer.fresh(PermissionNext.layer),
4144
Layer.fresh(ProviderAuth.defaultLayer),
@@ -56,7 +59,23 @@ export class Instances extends ServiceMap.Service<Instances, LayerMap.LayerMap<s
5659
Instances,
5760
Effect.gen(function* () {
5861
const layerMap = yield* LayerMap.make(lookup, { idleTimeToLive: Infinity })
59-
const unregister = registerDisposer((directory) => Effect.runPromise(layerMap.invalidate(directory)))
62+
63+
// Force-invalidate closes the RcMap entry scope even when refCount > 0.
64+
// Standard RcMap.invalidate bails in that case, leaving long-running
65+
// consumer fibers orphaned. This is an upstream issue:
66+
// https://github.com/Effect-TS/effect-smol/pull/1799
67+
const forceInvalidate = (directory: string) =>
68+
Effect.gen(function* () {
69+
const rcMap = layerMap.rcMap
70+
if (rcMap.state._tag === "Closed") return
71+
const entry = MutableHashMap.get(rcMap.state.map, directory)
72+
if (entry._tag === "None") return
73+
MutableHashMap.remove(rcMap.state.map, directory)
74+
if (entry.value.fiber) yield* Fiber.interrupt(entry.value.fiber)
75+
yield* Scope.close(entry.value.scope, Exit.void)
76+
}).pipe(Effect.uninterruptible, Effect.ignore)
77+
78+
const unregister = registerDisposer((directory) => Effect.runPromise(forceInvalidate(directory)))
6079
yield* Effect.addFinalizer(() => Effect.sync(unregister))
6180
return Instances.of(layerMap)
6281
}),

packages/opencode/src/effect/runtime.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ export function runPromiseInstance<A, E>(effect: Effect.Effect<A, E, InstanceSer
1818
return runtime.runPromise(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
1919
}
2020

21+
export function runCallbackInstance<A, E>(
22+
effect: Effect.Effect<A, E, InstanceServices>,
23+
): (interruptor?: number) => void {
24+
return runtime.runCallback(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
25+
}
26+
2127
export function disposeRuntime() {
2228
return runtime.dispose()
2329
}

0 commit comments

Comments
 (0)