Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/opencode/src/bus/global.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ export const GlobalBus = new EventEmitter<{
},
]
}>()
GlobalBus.setMaxListeners(100)
6 changes: 5 additions & 1 deletion packages/opencode/src/server/adapter.bun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ export const adapter: Adapter = {
const args = {
fetch: app.fetch,
hostname: opts.hostname,
idleTimeout: 0,
// Default is 10s which is too aggressive for SSE connections.
// 0 disables the timeout entirely — dead connections (CLOSE_WAIT) are
// never cleaned up, causing unbounded memory growth. 120s gives the
// cleanup chain enough time to fire while still bounding leak duration.
idleTimeout: 120,
websocket: ws.websocket,
} as const
const start = (port: number) => {
Expand Down
16 changes: 15 additions & 1 deletion packages/opencode/src/server/instance/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,25 @@ export const EventRoutes = () =>
})

stream.onAbort(stop)
// Second abort path: req.raw.signal fires before stream.onAbort on direct
// connections (~2ms earlier), and provides an independent cleanup path when
// the responseReadable.cancel() chain is broken (e.g. reverse proxy).
// Hono only registers this on Bun 1.0/1.1 (isOldBunVersion gate); we add
// it unconditionally so Bun 1.2+ is also covered.
c.req.raw.signal.addEventListener("abort", stop)

try {
for await (const data of q) {
if (data === null) return
await stream.writeSSE({ data })
try {
await stream.writeSSE({ data })
} catch {
// Hono 4.x StreamingApi.write() has an empty catch — this block
// never fires on the current version. Kept for forward compatibility
// in case a future Hono version propagates write errors.
stop()
return
}
}
} finally {
stop()
Expand Down
16 changes: 15 additions & 1 deletion packages/opencode/src/server/instance/global.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,25 @@ async function streamEvents(c: Context, subscribe: (q: AsyncQueue<string | null>
const unsub = subscribe(q)

stream.onAbort(stop)
// Second abort path: req.raw.signal fires before stream.onAbort on direct
// connections (~2ms earlier), and provides an independent cleanup path when
// the responseReadable.cancel() chain is broken (e.g. reverse proxy).
// Hono only registers this on Bun 1.0/1.1 (isOldBunVersion gate); we add
// it unconditionally so Bun 1.2+ is also covered.
c.req.raw.signal.addEventListener("abort", stop)

try {
for await (const data of q) {
if (data === null) return
await stream.writeSSE({ data })
try {
await stream.writeSSE({ data })
} catch {
// Hono 4.x StreamingApi.write() has an empty catch — this block
// never fires on the current version. Kept for forward compatibility
// in case a future Hono version propagates write errors.
stop()
return
}
}
} finally {
stop()
Expand Down
12 changes: 10 additions & 2 deletions packages/opencode/src/util/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,18 @@ export class AsyncQueue<T> implements AsyncIterable<T> {
private queue: T[] = []
private resolvers: ((value: T) => void)[] = []

constructor(private limit = 1000) {}

push(item: T) {
const resolve = this.resolvers.shift()
if (resolve) resolve(item)
else this.queue.push(item)
if (resolve) {
resolve(item)
} else {
if (this.queue.length >= this.limit) {
this.queue.shift()
}
this.queue.push(item)
}
}

async next(): Promise<T> {
Expand Down
70 changes: 70 additions & 0 deletions packages/opencode/test/util/queue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { describe, expect, test } from "bun:test"
import { AsyncQueue } from "../../src/util/queue"

describe("AsyncQueue", () => {
test("basic FIFO order", async () => {
const q = new AsyncQueue<number>()
q.push(1)
q.push(2)
q.push(3)
expect(await q.next()).toBe(1)
expect(await q.next()).toBe(2)
expect(await q.next()).toBe(3)
})

test("drops oldest entry when limit is reached", async () => {
const q = new AsyncQueue<number>(3)
q.push(1)
q.push(2)
q.push(3)
q.push(4) // should drop 1
expect(await q.next()).toBe(2)
expect(await q.next()).toBe(3)
expect(await q.next()).toBe(4)
})

test("queue length never exceeds limit", () => {
const limit = 5
const q = new AsyncQueue<number>(limit)
for (let i = 0; i < 100; i++) q.push(i)
// drain and count
let count = 0
while ((q as any).queue.length > 0) {
;(q as any).queue.shift()
count++
}
expect(count).toBeLessThanOrEqual(limit)
})

test("item delivered directly to waiting resolver bypasses limit", async () => {
const q = new AsyncQueue<number>(2)
// consumer is already waiting
const result = q.next()
q.push(99)
expect(await result).toBe(99)
// internal queue should still be empty
expect((q as any).queue.length).toBe(0)
})

test("null sentinel terminates async iteration", async () => {
const q = new AsyncQueue<number | null>()
q.push(1)
q.push(2)
q.push(null)
const collected: number[] = []
for await (const item of q) {
if (item === null) break
collected.push(item)
}
expect(collected).toEqual([1, 2])
})

test("zombie scenario: 10000 pushes with no consumer stays bounded", () => {
const limit = 100
const q = new AsyncQueue<number>(limit)
for (let i = 0; i < 10_000; i++) q.push(i)
expect((q as any).queue.length).toBe(limit)
// most recent items are retained
expect((q as any).queue[(q as any).queue.length - 1]).toBe(9999)
})
})
Loading