Skip to content

Commit 3efe97e

Browse files
author
danshapiro
committed
Merge main into fix/package-json-path-resolution
2 parents 1ffc79d + 65b552c commit 3efe97e

19 files changed

Lines changed: 1155 additions & 265 deletions

server/session-scanner/queue.ts

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ const PRIORITY_ORDER: Record<Priority, number> = {
2222
disk: 3,
2323
}
2424

25+
const MAX_PROCESSED_CACHE = 1000
26+
2527
export interface QueueItem {
2628
sessionId: string
2729
filePath: string
@@ -40,18 +42,21 @@ interface WaitingPromise {
4042
*/
4143
export class SessionRepairQueue extends EventEmitter {
4244
private queue: QueueItem[] = []
45+
private queuedBySessionId: Map<string, QueueItem> = new Map()
4346
private processing: Set<string> = new Set()
4447
private processed: Map<string, SessionScanResult> = new Map()
4548
private scanner: SessionScanner
4649
private cache: SessionCache
4750
private running = false
4851
private stopped = false
4952
private waiting: Map<string, WaitingPromise[]> = new Map()
53+
private maxProcessedCache: number
5054

51-
constructor(scanner: SessionScanner, cache: SessionCache) {
55+
constructor(scanner: SessionScanner, cache: SessionCache, options?: { maxProcessedCache?: number }) {
5256
super()
5357
this.scanner = scanner
5458
this.cache = cache
59+
this.maxProcessedCache = options?.maxProcessedCache ?? MAX_PROCESSED_CACHE
5560
}
5661

5762
/**
@@ -65,32 +70,46 @@ export class SessionRepairQueue extends EventEmitter {
6570
enqueue(
6671
sessions: Array<{ sessionId: string; filePath: string; priority: Priority }>
6772
): void {
73+
let needsSort = false
74+
const now = Date.now()
75+
let order = 0
76+
6877
for (const session of sessions) {
6978
// Skip if currently processing
7079
if (this.processing.has(session.sessionId)) continue
7180

7281
// Check if already in queue
73-
const existing = this.queue.find((q) => q.sessionId === session.sessionId)
82+
const existing = this.queuedBySessionId.get(session.sessionId)
7483

7584
if (existing) {
85+
if (session.filePath && session.filePath !== existing.filePath) {
86+
existing.filePath = session.filePath
87+
}
7688
// Re-prioritize if new priority is higher (lower number)
7789
if (PRIORITY_ORDER[session.priority] < PRIORITY_ORDER[existing.priority]) {
7890
existing.priority = session.priority
79-
this.sortQueue()
91+
needsSort = true
8092
}
8193
} else if (session.filePath) {
8294
// Only add new item if we have a filePath
83-
this.queue.push({
95+
const item: QueueItem = {
8496
sessionId: session.sessionId,
8597
filePath: session.filePath,
8698
priority: session.priority,
87-
addedAt: Date.now(),
88-
})
89-
this.sortQueue()
99+
addedAt: now + order,
100+
}
101+
order += 1
102+
this.queue.push(item)
103+
this.queuedBySessionId.set(session.sessionId, item)
104+
needsSort = true
90105
}
91106
// If no existing entry and no filePath, skip silently
92107
// (session will be picked up on next full scan if it exists on disk)
93108
}
109+
110+
if (needsSort) {
111+
this.sortQueue()
112+
}
94113
}
95114

96115
/**
@@ -122,7 +141,11 @@ export class SessionRepairQueue extends EventEmitter {
122141
* Remove and return next item.
123142
*/
124143
dequeue(): QueueItem | undefined {
125-
return this.queue.shift()
144+
const item = this.queue.shift()
145+
if (item) {
146+
this.queuedBySessionId.delete(item.sessionId)
147+
}
148+
return item
126149
}
127150

128151
/**
@@ -153,7 +176,7 @@ export class SessionRepairQueue extends EventEmitter {
153176
// Check cache first
154177
const cached = await this.cache.get(item.filePath)
155178
if (cached) {
156-
this.processed.set(item.sessionId, cached)
179+
this.setProcessed(item.sessionId, cached)
157180
this.emit('scanned', cached)
158181
this.resolveWaiting(item.sessionId, cached)
159182
this.processing.delete(item.sessionId)
@@ -174,10 +197,10 @@ export class SessionRepairQueue extends EventEmitter {
174197
// Re-scan to get updated result
175198
const newResult = await this.scanner.scan(item.filePath)
176199
await this.cache.set(item.filePath, newResult)
177-
this.processed.set(item.sessionId, newResult)
200+
this.setProcessed(item.sessionId, newResult)
178201
this.resolveWaiting(item.sessionId, newResult)
179202
} else {
180-
this.processed.set(item.sessionId, scanResult)
203+
this.setProcessed(item.sessionId, scanResult)
181204
this.resolveWaiting(item.sessionId, scanResult)
182205
}
183206
} catch (err) {
@@ -220,6 +243,20 @@ export class SessionRepairQueue extends EventEmitter {
220243
}
221244
}
222245

246+
private setProcessed(sessionId: string, result: SessionScanResult): void {
247+
if (this.processed.has(sessionId)) {
248+
this.processed.delete(sessionId)
249+
}
250+
this.processed.set(sessionId, result)
251+
252+
if (this.processed.size > this.maxProcessedCache) {
253+
const oldest = this.processed.keys().next().value
254+
if (oldest) {
255+
this.processed.delete(oldest)
256+
}
257+
}
258+
}
259+
223260
/**
224261
* Stop processing (graceful shutdown).
225262
*/
@@ -248,7 +285,7 @@ export class SessionRepairQueue extends EventEmitter {
248285
}
249286

250287
// Check if in queue or processing
251-
const inQueue = this.queue.some((q) => q.sessionId === sessionId)
288+
const inQueue = this.queuedBySessionId.has(sessionId)
252289
const isProcessing = this.processing.has(sessionId)
253290

254291
if (!inQueue && !isProcessing) {

server/ws-handler.ts

Lines changed: 69 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ import { configStore } from './config-store.js'
88
import type { ClaudeSessionManager } from './claude-session.js'
99
import type { ClaudeEvent } from './claude-stream-types.js'
1010
import type { SessionRepairService } from './session-scanner/service.js'
11+
import type { SessionScanResult, SessionRepairResult } from './session-scanner/types.js'
1112

1213
const MAX_CONNECTIONS = Number(process.env.MAX_CONNECTIONS || 10)
1314
const HELLO_TIMEOUT_MS = Number(process.env.HELLO_TIMEOUT_MS || 5_000)
1415
const PING_INTERVAL_MS = Number(process.env.PING_INTERVAL_MS || 30_000)
16+
const MAX_WS_BUFFERED_AMOUNT = Number(process.env.MAX_WS_BUFFERED_AMOUNT || 2 * 1024 * 1024)
1517

1618
// Extended WebSocket with liveness tracking for keepalive
1719
interface LiveWebSocket extends WebSocket {
@@ -142,6 +144,7 @@ type ClientState = {
142144
attachedTerminalIds: Set<string>
143145
createdByRequestId: Map<string, string>
144146
claudeSessions: Set<string>
147+
claudeSubscriptions: Map<string, () => void>
145148
interestedSessions: Set<string>
146149
helloTimer?: NodeJS.Timeout
147150
}
@@ -152,6 +155,10 @@ export class WsHandler {
152155
private clientStates = new Map<LiveWebSocket, ClientState>()
153156
private pingInterval: NodeJS.Timeout | null = null
154157
private sessionRepairService?: SessionRepairService
158+
private sessionRepairListeners?: {
159+
scanned: (result: SessionScanResult) => void
160+
repaired: (result: SessionRepairResult) => void
161+
}
155162

156163
constructor(
157164
server: http.Server,
@@ -182,24 +189,28 @@ export class WsHandler {
182189

183190
// Subscribe to session repair events
184191
if (this.sessionRepairService) {
185-
this.sessionRepairService.on('scanned', (result) => {
192+
const onScanned = (result: SessionScanResult) => {
186193
this.broadcastSessionStatus(result.sessionId, {
187194
type: 'session.status',
188195
sessionId: result.sessionId,
189196
status: result.status === 'healthy' ? 'healthy' : 'corrupted',
190197
chainDepth: result.chainDepth,
191198
})
192-
})
199+
}
193200

194-
this.sessionRepairService.on('repaired', (result) => {
201+
const onRepaired = (result: SessionRepairResult) => {
195202
this.broadcastSessionStatus(result.sessionId, {
196203
type: 'session.status',
197204
sessionId: result.sessionId,
198205
status: 'repaired',
199206
chainDepth: result.newChainDepth,
200207
orphansFixed: result.orphansFixed,
201208
})
202-
})
209+
}
210+
211+
this.sessionRepairListeners = { scanned: onScanned, repaired: onRepaired }
212+
this.sessionRepairService.on('scanned', onScanned)
213+
this.sessionRepairService.on('repaired', onRepaired)
203214
}
204215
}
205216

@@ -259,6 +270,7 @@ export class WsHandler {
259270
attachedTerminalIds: new Set(),
260271
createdByRequestId: new Map(),
261272
claudeSessions: new Set(),
273+
claudeSubscriptions: new Map(),
262274
interestedSessions: new Set(),
263275
}
264276
this.clientStates.set(ws, state)
@@ -291,10 +303,29 @@ export class WsHandler {
291303
this.registry.detach(terminalId, ws)
292304
}
293305
state.attachedTerminalIds.clear()
306+
for (const off of state.claudeSubscriptions.values()) {
307+
off()
308+
}
309+
state.claudeSubscriptions.clear()
310+
}
311+
312+
private removeClaudeSubscription(state: ClientState, sessionId: string) {
313+
const off = state.claudeSubscriptions.get(sessionId)
314+
if (off) {
315+
off()
316+
state.claudeSubscriptions.delete(sessionId)
317+
}
294318
}
295319

296320
private send(ws: LiveWebSocket, msg: unknown) {
297321
try {
322+
// Backpressure guard.
323+
// @ts-ignore
324+
const buffered = ws.bufferedAmount as number | undefined
325+
if (typeof buffered === 'number' && buffered > MAX_WS_BUFFERED_AMOUNT) {
326+
ws.close(CLOSE_CODES.BACKPRESSURE, 'Backpressure')
327+
return
328+
}
298329
ws.send(JSON.stringify(msg))
299330
} catch {
300331
// ignore
@@ -307,12 +338,16 @@ export class WsHandler {
307338
}
308339
}
309340

310-
private sendError(ws: LiveWebSocket, params: { code: z.infer<typeof ErrorCode>; message: string; requestId?: string }) {
341+
private sendError(
342+
ws: LiveWebSocket,
343+
params: { code: z.infer<typeof ErrorCode>; message: string; requestId?: string; terminalId?: string }
344+
) {
311345
this.send(ws, {
312346
type: 'error',
313347
code: params.code,
314348
message: params.message,
315349
requestId: params.requestId,
350+
terminalId: params.terminalId,
316351
timestamp: nowIso(),
317352
})
318353
}
@@ -445,7 +480,7 @@ export class WsHandler {
445480
case 'terminal.attach': {
446481
const rec = this.registry.attach(m.terminalId, ws)
447482
if (!rec) {
448-
this.sendError(ws, { code: 'INVALID_TERMINAL_ID', message: 'Unknown terminalId' })
483+
this.sendError(ws, { code: 'INVALID_TERMINAL_ID', message: 'Unknown terminalId', terminalId: m.terminalId })
449484
return
450485
}
451486
state.attachedTerminalIds.add(m.terminalId)
@@ -458,7 +493,7 @@ export class WsHandler {
458493
const ok = this.registry.detach(m.terminalId, ws)
459494
state.attachedTerminalIds.delete(m.terminalId)
460495
if (!ok) {
461-
this.sendError(ws, { code: 'INVALID_TERMINAL_ID', message: 'Unknown terminalId' })
496+
this.sendError(ws, { code: 'INVALID_TERMINAL_ID', message: 'Unknown terminalId', terminalId: m.terminalId })
462497
return
463498
}
464499
this.send(ws, { type: 'terminal.detached', terminalId: m.terminalId })
@@ -469,23 +504,23 @@ export class WsHandler {
469504
case 'terminal.input': {
470505
const ok = this.registry.input(m.terminalId, m.data)
471506
if (!ok) {
472-
this.sendError(ws, { code: 'INVALID_TERMINAL_ID', message: 'Terminal not running' })
507+
this.sendError(ws, { code: 'INVALID_TERMINAL_ID', message: 'Terminal not running', terminalId: m.terminalId })
473508
}
474509
return
475510
}
476511

477512
case 'terminal.resize': {
478513
const ok = this.registry.resize(m.terminalId, m.cols, m.rows)
479514
if (!ok) {
480-
this.sendError(ws, { code: 'INVALID_TERMINAL_ID', message: 'Terminal not running' })
515+
this.sendError(ws, { code: 'INVALID_TERMINAL_ID', message: 'Terminal not running', terminalId: m.terminalId })
481516
}
482517
return
483518
}
484519

485520
case 'terminal.kill': {
486521
const ok = this.registry.kill(m.terminalId)
487522
if (!ok) {
488-
this.sendError(ws, { code: 'INVALID_TERMINAL_ID', message: 'Unknown terminalId' })
523+
this.sendError(ws, { code: 'INVALID_TERMINAL_ID', message: 'Unknown terminalId', terminalId: m.terminalId })
489524
return
490525
}
491526
this.broadcast({ type: 'terminal.list.updated' })
@@ -531,29 +566,40 @@ export class WsHandler {
531566
// Track this client's session
532567
state.claudeSessions.add(session.id)
533568

534-
// Stream events to client
535-
session.on('event', (event: ClaudeEvent) => {
569+
// Stream events to client with detachable listeners
570+
const onEvent = (event: ClaudeEvent) => {
536571
this.safeSend(ws, {
537572
type: 'claude.event',
538573
sessionId: session.id,
539574
event,
540575
})
541-
})
576+
}
542577

543-
session.on('exit', (code: number) => {
578+
const onExit = (code: number) => {
544579
this.safeSend(ws, {
545580
type: 'claude.exit',
546581
sessionId: session.id,
547582
exitCode: code,
548583
})
549-
})
584+
this.removeClaudeSubscription(state, session.id)
585+
}
550586

551-
session.on('stderr', (text: string) => {
587+
const onStderr = (text: string) => {
552588
this.safeSend(ws, {
553589
type: 'claude.stderr',
554590
sessionId: session.id,
555591
text,
556592
})
593+
}
594+
595+
session.on('event', onEvent)
596+
session.on('exit', onExit)
597+
session.on('stderr', onStderr)
598+
599+
state.claudeSubscriptions.set(session.id, () => {
600+
session.off('event', onEvent)
601+
session.off('exit', onExit)
602+
session.off('stderr', onStderr)
557603
})
558604

559605
this.send(ws, {
@@ -596,6 +642,7 @@ export class WsHandler {
596642

597643
const removed = this.claudeManager.remove(m.sessionId)
598644
state.claudeSessions.delete(m.sessionId)
645+
this.removeClaudeSubscription(state, m.sessionId)
599646
this.send(ws, {
600647
type: 'claude.killed',
601648
sessionId: m.sessionId,
@@ -622,6 +669,12 @@ export class WsHandler {
622669
* Gracefully close all WebSocket connections and the server.
623670
*/
624671
close(): void {
672+
if (this.sessionRepairService && this.sessionRepairListeners) {
673+
this.sessionRepairService.off('scanned', this.sessionRepairListeners.scanned)
674+
this.sessionRepairService.off('repaired', this.sessionRepairListeners.repaired)
675+
this.sessionRepairListeners = undefined
676+
}
677+
625678
// Stop keepalive ping interval
626679
if (this.pingInterval) {
627680
clearInterval(this.pingInterval)

0 commit comments

Comments
 (0)