Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 54 additions & 31 deletions packages/opencode/src/mcp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ function isMcpConfigured(entry: McpEntry): entry is ConfigMCPV1.Info {
}

const sanitize = (s: string) => s.replace(/[^a-zA-Z0-9_-]/g, "_")
const MAX_LIST_PAGES = 1_000

function remoteURL(key: string, value: string) {
if (URL.canParse(value)) return new URL(value)
Expand All @@ -123,32 +124,43 @@ function isOutputSchemaValidationError(error: Error) {
)
}

function listTools(key: string, client: MCPClient, timeout: number) {
async function paginate<T, R extends { nextCursor?: string }>(
list: (cursor?: string) => Promise<R>,
items: (result: R) => T[],
) {
const result: T[] = []
const cursors = new Set<string>()
let cursor: string | undefined

for (let page = 0; page < MAX_LIST_PAGES; page++) {
const page = await list(cursor)
result.push(...items(page))
if (page.nextCursor === undefined) return result
if (cursors.has(page.nextCursor)) throw new Error(`MCP list returned duplicate cursor: ${page.nextCursor}`)
cursors.add(page.nextCursor)
cursor = page.nextCursor
}

throw new Error(`MCP list exceeded ${MAX_LIST_PAGES} pages`)
}

function listTools(client: MCPClient, timeout: number) {
return Effect.tryPromise({
try: () => client.listTools(undefined, { timeout }),
try: () =>
paginate(
async (cursor) => {
const params = cursor === undefined ? undefined : { cursor }
try {
return await client.listTools(params, { timeout })
} catch (error) {
if (!(error instanceof Error) || !isOutputSchemaValidationError(error)) throw error
return client.request({ method: "tools/list", params }, TolerantListToolsResultSchema, { timeout })
}
},
(result) => result.tools,
),
catch: (err) => (err instanceof Error ? err : new Error(String(err))),
}).pipe(
Effect.map((result) => result.tools),
Effect.catch((error) => {
if (!isOutputSchemaValidationError(error)) return Effect.fail(error)

return Effect.tryPromise({
try: () =>
client.request({ method: "tools/list" }, TolerantListToolsResultSchema, {
timeout,
}),
catch: (err) => (err instanceof Error ? err : new Error(String(err))),
}).pipe(
Effect.map((result) =>
result.tools.map((tool) => ({
name: tool.name,
description: tool.description,
inputSchema: tool.inputSchema,
})),
),
)
}),
)
})
}

// Convert MCP tool definition to AI SDK Tool type
Expand Down Expand Up @@ -182,8 +194,8 @@ function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number
})
}

function defs(key: string, client: MCPClient, timeout?: number) {
return listTools(key, client, timeout ?? DEFAULT_TIMEOUT).pipe(
function defs(client: MCPClient, timeout?: number) {
return listTools(client, timeout ?? DEFAULT_TIMEOUT).pipe(
Effect.catch((err) => {
return Effect.succeed(undefined)
}),
Expand Down Expand Up @@ -448,7 +460,7 @@ export const layer = Layer.effect(
return { status } satisfies CreateResult
}

const listed = mcpClient.getServerCapabilities()?.tools ? yield* defs(key, mcpClient, mcp.timeout) : []
const listed = mcpClient.getServerCapabilities()?.tools ? yield* defs(mcpClient, mcp.timeout) : []
if (!listed) {
yield* Effect.tryPromise(() => mcpClient.close()).pipe(Effect.ignore)
return { status: { status: "failed", error: "Failed to get tools" } } satisfies CreateResult
Expand Down Expand Up @@ -487,7 +499,7 @@ export const layer = Layer.effect(
client.setNotificationHandler(ToolListChangedNotificationSchema, async () => {
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return

const listed = await bridge.promise(defs(name, client, timeout))
const listed = await bridge.promise(defs(client, timeout))
if (!listed) return
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return

Expand Down Expand Up @@ -693,7 +705,13 @@ export const layer = Layer.effect(
const s = yield* InstanceState.get(state)
return yield* collectFromConnected(
s,
(c) => (c.getServerCapabilities()?.prompts ? c.listPrompts().then((r) => r.prompts) : Promise.resolve([])),
(c) =>
c.getServerCapabilities()?.prompts
? paginate(
(cursor) => c.listPrompts(cursor === undefined ? undefined : { cursor }),
(result) => result.prompts,
)
: Promise.resolve([]),
"prompts",
)
})
Expand All @@ -703,7 +721,12 @@ export const layer = Layer.effect(
return yield* collectFromConnected(
s,
(c) =>
c.getServerCapabilities()?.resources ? c.listResources().then((r) => r.resources) : Promise.resolve([]),
c.getServerCapabilities()?.resources
? paginate(
(cursor) => c.listResources(cursor === undefined ? undefined : { cursor }),
(result) => result.resources,
)
: Promise.resolve([]),
"resources",
)
})
Expand Down Expand Up @@ -830,7 +853,7 @@ export const layer = Layer.effect(

const listed = client
? client.getServerCapabilities()?.tools
? yield* defs(mcpName, client, mcpConfig.timeout)
? yield* defs(client, mcpConfig.timeout)
: []
: undefined
if (!client || !listed) {
Expand Down
130 changes: 125 additions & 5 deletions packages/opencode/test/mcp/lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ interface MockClientState {
listResourcesShouldFail: boolean
prompts: Array<{ name: string; description?: string }>
resources: Array<{ name: string; uri: string; description?: string }>
toolPages: Record<
string,
{
tools: Array<{ name: string; description?: string; inputSchema: object; outputSchema?: object }>
nextCursor?: string
}
>
promptPages: Record<string, { prompts: Array<{ name: string; description?: string }>; nextCursor?: string }>
resourcePages: Record<
string,
{ resources: Array<{ name: string; uri: string; description?: string }>; nextCursor?: string }
>
closed: boolean
notificationHandlers: Map<unknown, (...args: any[]) => any>
}
Expand Down Expand Up @@ -50,6 +62,9 @@ function getOrCreateClientState(name?: string): MockClientState {
listResourcesShouldFail: false,
prompts: [],
resources: [],
toolPages: {},
promptPages: {},
resourcePages: {},
closed: false,
notificationHandlers: new Map(),
}
Expand Down Expand Up @@ -143,33 +158,48 @@ void mock.module("@modelcontextprotocol/sdk/client/index.js", () => ({
return this._state?.capabilities
}

async listTools() {
async listTools(params?: { cursor?: string }) {
if (this._state) this._state.listToolsCalls++
if (this._state?.listToolsShouldFail) {
throw new Error(this._state.listToolsError)
}
const page = this._state?.toolPages[params === undefined ? "initial" : (params.cursor ?? "")]
if (page) return page
return { tools: this._state?.tools ?? [] }
}

async request(request: { method: string }, schema: { parse: (value: unknown) => unknown }) {
async request(
request: { method: string; params?: { cursor?: string } },
schema: { parse: (value: unknown) => unknown },
) {
if (this._state) this._state.requestCalls++
if (request.method === "tools/list") return schema.parse({ tools: this._state?.tools ?? [] })
if (request.method === "tools/list") {
return schema.parse(
this._state?.toolPages[request.params === undefined ? "initial" : (request.params.cursor ?? "")] ?? {
tools: this._state?.tools ?? [],
},
)
}
throw new Error(`unsupported request: ${request.method}`)
}

async listPrompts() {
async listPrompts(params?: { cursor?: string }) {
if (this._state) this._state.listPromptsCalls++
if (this._state?.listPromptsShouldFail) {
throw new Error("listPrompts failed")
}
const page = this._state?.promptPages[params === undefined ? "initial" : (params.cursor ?? "")]
if (page) return page
return { prompts: this._state?.prompts ?? [] }
}

async listResources() {
async listResources(params?: { cursor?: string }) {
if (this._state) this._state.listResourcesCalls++
if (this._state?.listResourcesShouldFail) {
throw new Error("listResources failed")
}
const page = this._state?.resourcePages[params === undefined ? "initial" : (params.cursor ?? "")]
if (page) return page
return { resources: this._state?.resources ?? [] }
}

Expand Down Expand Up @@ -234,6 +264,96 @@ it.instance(
{ config: { mcp: {} } },
)

it.instance(
"follows cursors when listing tools, prompts, and resources",
() =>
MCP.Service.use((mcp: MCPNS.Interface) =>
Effect.gen(function* () {
lastCreatedClientName = "paged-server"
const serverState = getOrCreateClientState("paged-server")
serverState.toolPages = {
initial: {
tools: [{ name: "tool-one", inputSchema: { type: "object", properties: {} } }],
nextCursor: "tools-2",
},
"tools-2": { tools: [{ name: "tool-two", inputSchema: { type: "object", properties: {} } }] },
}
serverState.promptPages = {
initial: { prompts: [{ name: "prompt-one" }], nextCursor: "prompts-2" },
"prompts-2": { prompts: [{ name: "prompt-two" }] },
}
serverState.resourcePages = {
initial: { resources: [{ name: "resource-one", uri: "test://one" }], nextCursor: "resources-2" },
"resources-2": { resources: [{ name: "resource-two", uri: "test://two" }] },
}

yield* mcp.add("paged-server", {
type: "local",
command: ["echo", "test"],
})

expect(Object.keys(yield* mcp.tools())).toEqual(["paged-server_tool-one", "paged-server_tool-two"])
expect(Object.keys(yield* mcp.prompts())).toEqual(["paged-server:prompt-one", "paged-server:prompt-two"])
expect(Object.keys(yield* mcp.resources())).toEqual(["paged-server:resource-one", "paged-server:resource-two"])
expect(serverState.listToolsCalls).toBe(2)
expect(serverState.listPromptsCalls).toBe(2)
expect(serverState.listResourcesCalls).toBe(2)
}),
),
{ config: { mcp: {} } },
)

it.instance(
"stops listing when a server repeats a cursor",
() =>
MCP.Service.use((mcp: MCPNS.Interface) =>
Effect.gen(function* () {
lastCreatedClientName = "looping-server"
const serverState = getOrCreateClientState("looping-server")
serverState.toolPages = {
initial: { tools: [], nextCursor: "repeat" },
repeat: { tools: [], nextCursor: "repeat" },
}

yield* mcp.add("looping-server", {
type: "local",
command: ["echo", "test"],
})

expect(serverState.listToolsCalls).toBe(2)
expect(yield* mcp.tools()).toEqual({})
}),
),
{ config: { mcp: {} } },
)

it.instance(
"follows empty cursors",
() =>
MCP.Service.use((mcp: MCPNS.Interface) =>
Effect.gen(function* () {
lastCreatedClientName = "empty-cursor-server"
const serverState = getOrCreateClientState("empty-cursor-server")
serverState.promptPages = {
initial: { prompts: [{ name: "prompt-one" }], nextCursor: "" },
"": { prompts: [{ name: "prompt-two" }] },
}

yield* mcp.add("empty-cursor-server", {
type: "local",
command: ["echo", "test"],
})

expect(Object.keys(yield* mcp.prompts())).toEqual([
"empty-cursor-server:prompt-one",
"empty-cursor-server:prompt-two",
])
expect(serverState.listPromptsCalls).toBe(2)
}),
),
{ config: { mcp: {} } },
)

// ========================================================================
// Test: tool change notifications refresh the cache
// ========================================================================
Expand Down
Loading