Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 29 additions & 12 deletions packages/core/src/effect/observability.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import { Effect, Layer, Logger } from "effect"
import { FetchHttpClient } from "effect/unstable/http"
import { OtlpLogger, OtlpSerialization } from "effect/unstable/observability"
import { context as otelContext } from "@opentelemetry/api"
import { AsyncLocalStorageContextManager } from "@opentelemetry/context-async-hooks"
import * as EffectLogger from "./logger"
import { Flag } from "../flag/flag"
import { InstallationChannel, InstallationVersion } from "../installation/version"
import { ensureProcessMetadata } from "../util/opencode-process"

const base = Flag.OTEL_EXPORTER_OTLP_ENDPOINT
export const enabled = !!base
// Evaluated at access time so tests and runtime callers see the current
// env value rather than the snapshot captured when the Flag module loaded.
export const isEnabled = () => !!process.env["OTEL_EXPORTER_OTLP_ENDPOINT"]
const processID = crypto.randomUUID()
const otelGlobalsKey = Symbol.for("opencode.otel.api.globals.initialized")

const headers = Flag.OTEL_EXPORTER_OTLP_HEADERS
? Flag.OTEL_EXPORTER_OTLP_HEADERS.split(",").reduce(
Expand Down Expand Up @@ -67,22 +72,29 @@ function logs() {
).pipe(Layer.provide(OtlpSerialization.layerJson), Layer.provide(FetchHttpClient.layer))
}

// Install the OTel API global context manager idempotently. This is the
// same fix as PR #22645 (Kit Langton) but extracted so other modules can
// call it before the OTel SDK Layer has been built (e.g. from the MCP
// layer, which constructs its tracing fetch eagerly). Effect's tracer
// integration relies on `context.active()` returning the current span;
// without a real context manager it always returns ROOT_CONTEXT.
export function setupOtelApiGlobals(): void {
const state = globalThis as Record<symbol, boolean>
if (state[otelGlobalsKey]) return
const mgr = new AsyncLocalStorageContextManager()
if (otelContext.setGlobalContextManager(mgr)) mgr.enable()
state[otelGlobalsKey] = true
}

const traces = async () => {
const NodeSdk = await import("@effect/opentelemetry/NodeSdk")
const OTLP = await import("@opentelemetry/exporter-trace-otlp-http")
const SdkBase = await import("@opentelemetry/sdk-trace-base")

// @effect/opentelemetry creates a NodeTracerProvider but never calls
// register(), so the global @opentelemetry/api context manager stays
// as the no-op default. Non-Effect code (like the AI SDK) that calls
// tracer.startActiveSpan() relies on context.active() to find the
// parent span - without a real context manager every span starts a
// new trace. Registering AsyncLocalStorageContextManager fixes this.
const { AsyncLocalStorageContextManager } = await import("@opentelemetry/context-async-hooks")
const { context } = await import("@opentelemetry/api")
const mgr = new AsyncLocalStorageContextManager()
mgr.enable()
context.setGlobalContextManager(mgr)
// as the no-op default. Non-Effect code (like the AI SDK) that reads
// context.active() needs a real context manager installed.
setupOtelApiGlobals()

return NodeSdk.layer(() => ({
resource: resource(),
Expand All @@ -104,4 +116,9 @@ export const layer = !base
}),
)

export const Observability = { enabled, layer }
export const Observability = {
get enabled() {
return isEnabled()
},
layer,
}
58 changes: 57 additions & 1 deletion packages/opencode/src/mcp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js"
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"
import { UnauthorizedError } from "@modelcontextprotocol/sdk/client/auth.js"
import { context as otelContext, trace } from "@opentelemetry/api"
import {
CallToolResultSchema,
ListToolsResultSchema,
Expand Down Expand Up @@ -31,6 +32,7 @@ import { EffectBridge } from "@/effect/bridge"
import { InstanceState } from "@/effect/instance-state"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
import { Observability, setupOtelApiGlobals } from "@opencode-ai/core/effect/observability"

const log = Log.create({ service: "mcp" })
const DEFAULT_TIMEOUT = 30_000
Expand Down Expand Up @@ -150,6 +152,55 @@ function listTools(key: string, client: MCPClient, timeout: number) {
)
}

// Build a fetch-like function that injects the W3C `traceparent` header
// (and `tracestate`, when present) into every outgoing MCP HTTP request,
// so the remote MCP server's spans link back to the opencode trace.
// Returned only when distributed tracing is enabled — see isTracingEnabled().
//
// We hand-roll the wrapper rather than importing `createMiddleware` from
// `@modelcontextprotocol/sdk/client/middleware.js` because that module
// transitively pulls in `auth.js` symbols (auth, extractWWWAuthenticateParams)
// that several existing tests do not mock. The shape we need is trivial.
function buildTracingFetch(): typeof fetch {
// Ensure the global context manager is installed so `context.active()`
// returns the current Effect span (rather than ROOT_CONTEXT). Safe to
// call repeatedly.
setupOtelApiGlobals()
return ((input: RequestInfo | URL, init?: RequestInit) => {
const headers = new Headers(input instanceof Request ? input.headers : undefined)
new Headers(init?.headers).forEach((value, key) => headers.set(key, value))
injectTraceContextHeaders(headers)
// Resolve globalThis.fetch on each call so tests that swap it after
// wiring (and production code that wraps fetch later) still observe
// the current implementation.
if (input instanceof Request) {
return globalThis.fetch(new Request(input, { ...init, headers }))
}
return globalThis.fetch(input, { ...init, headers })
}) as typeof fetch
}

// Format the active span context as a W3C `traceparent` header and set it
// (along with `tracestate`, when non-empty) on the carrier. No-op when no
// valid span is active. Spec: https://www.w3.org/TR/trace-context/#traceparent-header
function injectTraceContextHeaders(headers: Headers): void {
const sc = trace.getSpan(otelContext.active())?.spanContext()
if (!sc || !trace.isSpanContextValid(sc)) return
const flags = (sc.traceFlags ?? 0).toString(16).padStart(2, "0")
headers.set("traceparent", `00-${sc.traceId}-${sc.spanId}-${flags}`)
const tracestate = sc.traceState?.serialize()
if (tracestate) headers.set("tracestate", tracestate)
}

// Distributed trace propagation to MCP servers requires both:
// 1. OTel SDK is active (OTEL_EXPORTER_OTLP_ENDPOINT env var set), and
// 2. The user opted in via experimental.openTelemetry config flag.
// The first ensures the global context manager is registered; the second
// matches the gating used for AI SDK telemetry spans.
function isTracingEnabled(cfg: Config.Info): boolean {
return Observability.enabled && cfg.experimental?.openTelemetry === true
}

// Convert MCP tool definition to AI SDK Tool type
function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number): Tool {
const inputSchema = mcpTool.inputSchema
Expand Down Expand Up @@ -270,6 +321,7 @@ export const layer = Layer.effect(
const spawner = yield* ChildProcessSpawner.ChildProcessSpawner
const auth = yield* McpAuth.Service
const bus = yield* Bus.Service
const cfgSvc = yield* Config.Service

type Transport = StdioClientTransport | StreamableHTTPClientTransport | SSEClientTransport

Expand Down Expand Up @@ -327,19 +379,24 @@ export const layer = Layer.effect(
)
}

const cfg = yield* cfgSvc.get()
const tracingFetch = isTracingEnabled(cfg) ? buildTracingFetch() : undefined

const transports: Array<{ name: string; transport: TransportWithAuth }> = [
{
name: "StreamableHTTP",
transport: new StreamableHTTPClientTransport(url, {
authProvider,
requestInit: mcp.headers ? { headers: mcp.headers } : undefined,
fetch: tracingFetch,
}),
},
{
name: "SSE",
transport: new SSEClientTransport(url, {
authProvider,
requestInit: mcp.headers ? { headers: mcp.headers } : undefined,
fetch: tracingFetch,
}),
},
]
Expand Down Expand Up @@ -470,7 +527,6 @@ export const layer = Layer.effect(
log.info("create() successfully created client", { key, toolCount: listed.length })
return { mcpClient, status, defs: listed } satisfies CreateResult
})
const cfgSvc = yield* Config.Service

const descendants = Effect.fnUntraced(
function* (pid: number) {
Expand Down
129 changes: 124 additions & 5 deletions packages/opencode/test/mcp/headers.test.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import { describe, expect, mock, beforeEach } from "bun:test"
import { Effect } from "effect"
import { context as OtelContext, trace, TraceFlags } from "@opentelemetry/api"
import { testEffect } from "../lib/effect"

// Track what options were passed to each transport constructor
const transportCalls: Array<{
type: "streamable" | "sse"
url: string
options: { authProvider?: unknown; requestInit?: RequestInit }
options: { authProvider?: unknown; requestInit?: RequestInit; fetch?: typeof fetch }
}> = []

// Mock the transport constructors to capture their arguments
void mock.module("@modelcontextprotocol/sdk/client/streamableHttp.js", () => ({
StreamableHTTPClientTransport: class MockStreamableHTTP {
constructor(url: URL, options?: { authProvider?: unknown; requestInit?: RequestInit }) {
constructor(url: URL, options?: { authProvider?: unknown; requestInit?: RequestInit; fetch?: typeof fetch }) {
transportCalls.push({
type: "streamable",
url: url.toString(),
Expand All @@ -27,7 +28,7 @@ void mock.module("@modelcontextprotocol/sdk/client/streamableHttp.js", () => ({

void mock.module("@modelcontextprotocol/sdk/client/sse.js", () => ({
SSEClientTransport: class MockSSE {
constructor(url: URL, options?: { authProvider?: unknown; requestInit?: RequestInit }) {
constructor(url: URL, options?: { authProvider?: unknown; requestInit?: RequestInit; fetch?: typeof fetch }) {
transportCalls.push({
type: "sse",
url: url.toString(),
Expand All @@ -44,8 +45,19 @@ beforeEach(() => {
transportCalls.length = 0
})

// Import MCP after mocking
// The OTEL endpoint env must remain set for the lifetime of this test file
// because Observability.enabled (the tracing fetch gate) reads it at access
// time. Restore on exit so concurrent test files aren't affected.
const otelEndpoint = process.env.OTEL_EXPORTER_OTLP_ENDPOINT
process.env.OTEL_EXPORTER_OTLP_ENDPOINT = "http://127.0.0.1:4318"
process.on("exit", () => {
if (otelEndpoint === undefined) delete process.env.OTEL_EXPORTER_OTLP_ENDPOINT
else process.env.OTEL_EXPORTER_OTLP_ENDPOINT = otelEndpoint
})

// Import MCP after mocking + env is set.
const { MCP } = await import("../../src/mcp/index")

const it = testEffect(MCP.defaultLayer)

describe("mcp.headers", () => {
Expand All @@ -63,7 +75,6 @@ describe("mcp.headers", () => {
})
.pipe(Effect.catch(() => Effect.void))

// Both transports should have been created with headers
expect(transportCalls.length).toBeGreaterThanOrEqual(1)

for (const call of transportCalls) {
Expand Down Expand Up @@ -124,3 +135,111 @@ describe("mcp.headers", () => {
}),
)
})

describe("mcp.tracing", () => {
it.instance(
"tracing fetch is attached when openTelemetry is enabled",
() =>
Effect.gen(function* () {
const mcp = yield* MCP.Service
yield* mcp
.add("test-server", {
type: "remote",
url: "https://example.com/mcp",
})
.pipe(Effect.catch(() => Effect.void))

expect(transportCalls.length).toBeGreaterThanOrEqual(1)
for (const call of transportCalls) {
expect(call.options.fetch).toBeDefined()
}
}),
{ config: { experimental: { openTelemetry: true } } },
)

it.instance(
"tracing fetch is not attached when openTelemetry is disabled",
() =>
Effect.gen(function* () {
const mcp = yield* MCP.Service
yield* mcp
.add("test-server", {
type: "remote",
url: "https://example.com/mcp",
})
.pipe(Effect.catch(() => Effect.void))

expect(transportCalls.length).toBeGreaterThanOrEqual(1)
for (const call of transportCalls) {
expect(call.options.fetch).toBeUndefined()
}
}),
{ config: { experimental: { openTelemetry: false } } },
)

it.instance(
"tracing fetch injects trace headers and preserves existing headers",
() =>
Effect.gen(function* () {
const mcp = yield* MCP.Service
yield* mcp
.add("test-server", {
type: "remote",
url: "https://example.com/mcp",
headers: {
Authorization: "Bearer test-token",
},
})
.pipe(Effect.catch(() => Effect.void))

const call = transportCalls.find((item) => item.type === "streamable")
expect(call).toBeDefined()
expect(call?.options.fetch).toBeDefined()

const originalFetch = globalThis.fetch
const seenHeaders: Record<string, string> = {}
globalThis.fetch = (async (input: RequestInfo | URL, init?: RequestInit) => {
const outgoing = input instanceof Request ? new Headers(input.headers) : new Headers(init?.headers)
outgoing.forEach((value, key) => {
seenHeaders[key] = value
})
return new Response("ok", { status: 200 })
}) as typeof fetch

try {
const spanContext = trace.wrapSpanContext({
traceId: "11111111111111111111111111111111",
spanId: "2222222222222222",
traceFlags: TraceFlags.SAMPLED,
})

yield* Effect.promise(() =>
OtelContext.with(trace.setSpan(OtelContext.active(), spanContext), async () => {
await call?.options.fetch?.(
new Request("https://example.com/mcp", {
method: "POST",
headers: {
"x-from-request": "request-value",
authorization: "Bearer test-token",
},
}),
{
headers: {
"x-from-init": "init-value",
},
},
)
}),
)

expect(seenHeaders.authorization).toBe("Bearer test-token")
expect(seenHeaders["x-from-request"]).toBe("request-value")
expect(seenHeaders["x-from-init"]).toBe("init-value")
expect(seenHeaders.traceparent).toBe("00-11111111111111111111111111111111-2222222222222222-01")
} finally {
globalThis.fetch = originalFetch
}
}),
{ config: { experimental: { openTelemetry: true } } },
)
})
Loading