From 46b57a421c2aa79a9b3eb54cbefdce89e00edc16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Hrdli=C4=8Dka?= Date: Sun, 17 May 2026 23:00:07 +0200 Subject: [PATCH] fix(mcp): propagate W3C trace context on remote MCP HTTP calls --- packages/core/src/effect/observability.ts | 41 +++++-- packages/opencode/src/mcp/index.ts | 58 ++++++++- packages/opencode/test/mcp/headers.test.ts | 129 ++++++++++++++++++++- 3 files changed, 210 insertions(+), 18 deletions(-) diff --git a/packages/core/src/effect/observability.ts b/packages/core/src/effect/observability.ts index 0203079abe1e..5f139a1e647a 100644 --- a/packages/core/src/effect/observability.ts +++ b/packages/core/src/effect/observability.ts @@ -1,14 +1,19 @@ import { Effect, Layer, Logger } from "effect" import { FetchHttpClient } from "effect/unstable/http" import { OtlpLogger, OtlpSerialization } from "effect/unstable/observability" +import { context as otelContext } from "@opentelemetry/api" +import { AsyncLocalStorageContextManager } from "@opentelemetry/context-async-hooks" import * as EffectLogger from "./logger" import { Flag } from "../flag/flag" import { InstallationChannel, InstallationVersion } from "../installation/version" import { ensureProcessMetadata } from "../util/opencode-process" const base = Flag.OTEL_EXPORTER_OTLP_ENDPOINT -export const enabled = !!base +// Evaluated at access time so tests and runtime callers see the current +// env value rather than the snapshot captured when the Flag module loaded. +export const isEnabled = () => !!process.env["OTEL_EXPORTER_OTLP_ENDPOINT"] const processID = crypto.randomUUID() +const otelGlobalsKey = Symbol.for("opencode.otel.api.globals.initialized") const headers = Flag.OTEL_EXPORTER_OTLP_HEADERS ? Flag.OTEL_EXPORTER_OTLP_HEADERS.split(",").reduce( @@ -67,22 +72,29 @@ function logs() { ).pipe(Layer.provide(OtlpSerialization.layerJson), Layer.provide(FetchHttpClient.layer)) } +// Install the OTel API global context manager idempotently. This is the +// same fix as PR #22645 (Kit Langton) but extracted so other modules can +// call it before the OTel SDK Layer has been built (e.g. from the MCP +// layer, which constructs its tracing fetch eagerly). Effect's tracer +// integration relies on `context.active()` returning the current span; +// without a real context manager it always returns ROOT_CONTEXT. +export function setupOtelApiGlobals(): void { + const state = globalThis as Record + if (state[otelGlobalsKey]) return + const mgr = new AsyncLocalStorageContextManager() + if (otelContext.setGlobalContextManager(mgr)) mgr.enable() + state[otelGlobalsKey] = true +} + const traces = async () => { const NodeSdk = await import("@effect/opentelemetry/NodeSdk") const OTLP = await import("@opentelemetry/exporter-trace-otlp-http") const SdkBase = await import("@opentelemetry/sdk-trace-base") - // @effect/opentelemetry creates a NodeTracerProvider but never calls // register(), so the global @opentelemetry/api context manager stays - // as the no-op default. Non-Effect code (like the AI SDK) that calls - // tracer.startActiveSpan() relies on context.active() to find the - // parent span - without a real context manager every span starts a - // new trace. Registering AsyncLocalStorageContextManager fixes this. - const { AsyncLocalStorageContextManager } = await import("@opentelemetry/context-async-hooks") - const { context } = await import("@opentelemetry/api") - const mgr = new AsyncLocalStorageContextManager() - mgr.enable() - context.setGlobalContextManager(mgr) + // as the no-op default. Non-Effect code (like the AI SDK) that reads + // context.active() needs a real context manager installed. + setupOtelApiGlobals() return NodeSdk.layer(() => ({ resource: resource(), @@ -104,4 +116,9 @@ export const layer = !base }), ) -export const Observability = { enabled, layer } +export const Observability = { + get enabled() { + return isEnabled() + }, + layer, +} diff --git a/packages/opencode/src/mcp/index.ts b/packages/opencode/src/mcp/index.ts index 832811b281a5..4a268d215241 100644 --- a/packages/opencode/src/mcp/index.ts +++ b/packages/opencode/src/mcp/index.ts @@ -4,6 +4,7 @@ import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/ import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js" import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js" import { UnauthorizedError } from "@modelcontextprotocol/sdk/client/auth.js" +import { context as otelContext, trace } from "@opentelemetry/api" import { CallToolResultSchema, ListToolsResultSchema, @@ -31,6 +32,7 @@ import { EffectBridge } from "@/effect/bridge" import { InstanceState } from "@/effect/instance-state" import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner" +import { Observability, setupOtelApiGlobals } from "@opencode-ai/core/effect/observability" const log = Log.create({ service: "mcp" }) const DEFAULT_TIMEOUT = 30_000 @@ -150,6 +152,55 @@ function listTools(key: string, client: MCPClient, timeout: number) { ) } +// Build a fetch-like function that injects the W3C `traceparent` header +// (and `tracestate`, when present) into every outgoing MCP HTTP request, +// so the remote MCP server's spans link back to the opencode trace. +// Returned only when distributed tracing is enabled — see isTracingEnabled(). +// +// We hand-roll the wrapper rather than importing `createMiddleware` from +// `@modelcontextprotocol/sdk/client/middleware.js` because that module +// transitively pulls in `auth.js` symbols (auth, extractWWWAuthenticateParams) +// that several existing tests do not mock. The shape we need is trivial. +function buildTracingFetch(): typeof fetch { + // Ensure the global context manager is installed so `context.active()` + // returns the current Effect span (rather than ROOT_CONTEXT). Safe to + // call repeatedly. + setupOtelApiGlobals() + return ((input: RequestInfo | URL, init?: RequestInit) => { + const headers = new Headers(input instanceof Request ? input.headers : undefined) + new Headers(init?.headers).forEach((value, key) => headers.set(key, value)) + injectTraceContextHeaders(headers) + // Resolve globalThis.fetch on each call so tests that swap it after + // wiring (and production code that wraps fetch later) still observe + // the current implementation. + if (input instanceof Request) { + return globalThis.fetch(new Request(input, { ...init, headers })) + } + return globalThis.fetch(input, { ...init, headers }) + }) as typeof fetch +} + +// Format the active span context as a W3C `traceparent` header and set it +// (along with `tracestate`, when non-empty) on the carrier. No-op when no +// valid span is active. Spec: https://www.w3.org/TR/trace-context/#traceparent-header +function injectTraceContextHeaders(headers: Headers): void { + const sc = trace.getSpan(otelContext.active())?.spanContext() + if (!sc || !trace.isSpanContextValid(sc)) return + const flags = (sc.traceFlags ?? 0).toString(16).padStart(2, "0") + headers.set("traceparent", `00-${sc.traceId}-${sc.spanId}-${flags}`) + const tracestate = sc.traceState?.serialize() + if (tracestate) headers.set("tracestate", tracestate) +} + +// Distributed trace propagation to MCP servers requires both: +// 1. OTel SDK is active (OTEL_EXPORTER_OTLP_ENDPOINT env var set), and +// 2. The user opted in via experimental.openTelemetry config flag. +// The first ensures the global context manager is registered; the second +// matches the gating used for AI SDK telemetry spans. +function isTracingEnabled(cfg: Config.Info): boolean { + return Observability.enabled && cfg.experimental?.openTelemetry === true +} + // Convert MCP tool definition to AI SDK Tool type function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number): Tool { const inputSchema = mcpTool.inputSchema @@ -270,6 +321,7 @@ export const layer = Layer.effect( const spawner = yield* ChildProcessSpawner.ChildProcessSpawner const auth = yield* McpAuth.Service const bus = yield* Bus.Service + const cfgSvc = yield* Config.Service type Transport = StdioClientTransport | StreamableHTTPClientTransport | SSEClientTransport @@ -327,12 +379,16 @@ export const layer = Layer.effect( ) } + const cfg = yield* cfgSvc.get() + const tracingFetch = isTracingEnabled(cfg) ? buildTracingFetch() : undefined + const transports: Array<{ name: string; transport: TransportWithAuth }> = [ { name: "StreamableHTTP", transport: new StreamableHTTPClientTransport(url, { authProvider, requestInit: mcp.headers ? { headers: mcp.headers } : undefined, + fetch: tracingFetch, }), }, { @@ -340,6 +396,7 @@ export const layer = Layer.effect( transport: new SSEClientTransport(url, { authProvider, requestInit: mcp.headers ? { headers: mcp.headers } : undefined, + fetch: tracingFetch, }), }, ] @@ -470,7 +527,6 @@ export const layer = Layer.effect( log.info("create() successfully created client", { key, toolCount: listed.length }) return { mcpClient, status, defs: listed } satisfies CreateResult }) - const cfgSvc = yield* Config.Service const descendants = Effect.fnUntraced( function* (pid: number) { diff --git a/packages/opencode/test/mcp/headers.test.ts b/packages/opencode/test/mcp/headers.test.ts index c51ed00d32f6..b0b5a527dc9b 100644 --- a/packages/opencode/test/mcp/headers.test.ts +++ b/packages/opencode/test/mcp/headers.test.ts @@ -1,18 +1,19 @@ import { describe, expect, mock, beforeEach } from "bun:test" import { Effect } from "effect" +import { context as OtelContext, trace, TraceFlags } from "@opentelemetry/api" import { testEffect } from "../lib/effect" // Track what options were passed to each transport constructor const transportCalls: Array<{ type: "streamable" | "sse" url: string - options: { authProvider?: unknown; requestInit?: RequestInit } + options: { authProvider?: unknown; requestInit?: RequestInit; fetch?: typeof fetch } }> = [] // Mock the transport constructors to capture their arguments void mock.module("@modelcontextprotocol/sdk/client/streamableHttp.js", () => ({ StreamableHTTPClientTransport: class MockStreamableHTTP { - constructor(url: URL, options?: { authProvider?: unknown; requestInit?: RequestInit }) { + constructor(url: URL, options?: { authProvider?: unknown; requestInit?: RequestInit; fetch?: typeof fetch }) { transportCalls.push({ type: "streamable", url: url.toString(), @@ -27,7 +28,7 @@ void mock.module("@modelcontextprotocol/sdk/client/streamableHttp.js", () => ({ void mock.module("@modelcontextprotocol/sdk/client/sse.js", () => ({ SSEClientTransport: class MockSSE { - constructor(url: URL, options?: { authProvider?: unknown; requestInit?: RequestInit }) { + constructor(url: URL, options?: { authProvider?: unknown; requestInit?: RequestInit; fetch?: typeof fetch }) { transportCalls.push({ type: "sse", url: url.toString(), @@ -44,8 +45,19 @@ beforeEach(() => { transportCalls.length = 0 }) -// Import MCP after mocking +// The OTEL endpoint env must remain set for the lifetime of this test file +// because Observability.enabled (the tracing fetch gate) reads it at access +// time. Restore on exit so concurrent test files aren't affected. +const otelEndpoint = process.env.OTEL_EXPORTER_OTLP_ENDPOINT +process.env.OTEL_EXPORTER_OTLP_ENDPOINT = "http://127.0.0.1:4318" +process.on("exit", () => { + if (otelEndpoint === undefined) delete process.env.OTEL_EXPORTER_OTLP_ENDPOINT + else process.env.OTEL_EXPORTER_OTLP_ENDPOINT = otelEndpoint +}) + +// Import MCP after mocking + env is set. const { MCP } = await import("../../src/mcp/index") + const it = testEffect(MCP.defaultLayer) describe("mcp.headers", () => { @@ -63,7 +75,6 @@ describe("mcp.headers", () => { }) .pipe(Effect.catch(() => Effect.void)) - // Both transports should have been created with headers expect(transportCalls.length).toBeGreaterThanOrEqual(1) for (const call of transportCalls) { @@ -124,3 +135,111 @@ describe("mcp.headers", () => { }), ) }) + +describe("mcp.tracing", () => { + it.instance( + "tracing fetch is attached when openTelemetry is enabled", + () => + Effect.gen(function* () { + const mcp = yield* MCP.Service + yield* mcp + .add("test-server", { + type: "remote", + url: "https://example.com/mcp", + }) + .pipe(Effect.catch(() => Effect.void)) + + expect(transportCalls.length).toBeGreaterThanOrEqual(1) + for (const call of transportCalls) { + expect(call.options.fetch).toBeDefined() + } + }), + { config: { experimental: { openTelemetry: true } } }, + ) + + it.instance( + "tracing fetch is not attached when openTelemetry is disabled", + () => + Effect.gen(function* () { + const mcp = yield* MCP.Service + yield* mcp + .add("test-server", { + type: "remote", + url: "https://example.com/mcp", + }) + .pipe(Effect.catch(() => Effect.void)) + + expect(transportCalls.length).toBeGreaterThanOrEqual(1) + for (const call of transportCalls) { + expect(call.options.fetch).toBeUndefined() + } + }), + { config: { experimental: { openTelemetry: false } } }, + ) + + it.instance( + "tracing fetch injects trace headers and preserves existing headers", + () => + Effect.gen(function* () { + const mcp = yield* MCP.Service + yield* mcp + .add("test-server", { + type: "remote", + url: "https://example.com/mcp", + headers: { + Authorization: "Bearer test-token", + }, + }) + .pipe(Effect.catch(() => Effect.void)) + + const call = transportCalls.find((item) => item.type === "streamable") + expect(call).toBeDefined() + expect(call?.options.fetch).toBeDefined() + + const originalFetch = globalThis.fetch + const seenHeaders: Record = {} + globalThis.fetch = (async (input: RequestInfo | URL, init?: RequestInit) => { + const outgoing = input instanceof Request ? new Headers(input.headers) : new Headers(init?.headers) + outgoing.forEach((value, key) => { + seenHeaders[key] = value + }) + return new Response("ok", { status: 200 }) + }) as typeof fetch + + try { + const spanContext = trace.wrapSpanContext({ + traceId: "11111111111111111111111111111111", + spanId: "2222222222222222", + traceFlags: TraceFlags.SAMPLED, + }) + + yield* Effect.promise(() => + OtelContext.with(trace.setSpan(OtelContext.active(), spanContext), async () => { + await call?.options.fetch?.( + new Request("https://example.com/mcp", { + method: "POST", + headers: { + "x-from-request": "request-value", + authorization: "Bearer test-token", + }, + }), + { + headers: { + "x-from-init": "init-value", + }, + }, + ) + }), + ) + + expect(seenHeaders.authorization).toBe("Bearer test-token") + expect(seenHeaders["x-from-request"]).toBe("request-value") + expect(seenHeaders["x-from-init"]).toBe("init-value") + expect(seenHeaders.traceparent).toBe("00-11111111111111111111111111111111-2222222222222222-01") + } finally { + globalThis.fetch = originalFetch + } + }), + { config: { experimental: { openTelemetry: true } } }, + ) +})