Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .nx/version-plans/multimodal-blob-offload.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
'@agentmark-ai/api-types': minor
'@agentmark-ai/api-schemas': minor
'@agentmark-ai/prompt-core': minor
'@agentmark-ai/ui-components': minor
---

Size-driven blob offload for trace I/O (multimodal output support).

Oversized span fields (image/audio/large text output, large inputs, tool calls)
are lifted to object storage at ingest; ClickHouse keeps an 8KB inline preview
plus a `BlobRefs` pointer, so the 128KB queue-message limit never truncates a
generation. Full-fidelity consumers fetch the full value back on demand.

- **api-types**: `Span` / `SpanIO` gain an optional `blobRefs` (JSON array of
offloaded-field pointers); `ExperimentItemSummary` gains an optional
`blobRefs` so the experiment-detail path can rehydrate offloaded item I/O.
All additive — existing consumers are unaffected.
- **api-schemas**: `ExperimentItemSummarySchema` gains an optional `blobRefs`
(the gateway rehydrates the full value into `input`/`output` before
responding, so consumers may ignore it).
- **prompt-core**: the webhook runner records image/speech generation output via
`setSpanOutput` (the `agentmark.output` attribute) so generated media is
captured on the span and offloaded like any other oversized field.
- **ui-components**: the trace drawer's Input/Output tab renders every offloaded
field — image/audio inline (data URIs), full text/JSON otherwise — fetched on
demand via the host-provided `fetchBlob`; `OutputObject` is deduped when
`Output` is also offloaded.
4 changes: 4 additions & 0 deletions packages/api-schemas/src/schemas/experiments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ export const ExperimentItemSummarySchema = z.object({
tokens: z.number(),
model: z.string(),
scores: z.array(ExperimentItemScoreSchema),
// Present (carried from ClickHouse) only when this item's input/output was
// offloaded to object storage at ingest. The gateway rehydrates the full
// value into `input`/`output` before responding, so consumers can ignore it.
blobRefs: z.string().optional(),
});

export const ExperimentDetailSchema = ExperimentSummarySchema.extend({
Expand Down
13 changes: 13 additions & 0 deletions packages/api-types/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,13 @@ export interface Span {
spanKind: string;
serviceName: string;
promptName: string | null;
/**
* JSON array of pointers to oversized field payloads offloaded to object
* storage: `[{field,blob_id,size}]`. Empty/absent when nothing was offloaded.
* When present, the matching inline column (input/output/...) holds only a
* preview; the full value is fetched from storage by blob_id.
*/
blobRefs?: string;
}

/**
Expand All @@ -461,6 +468,12 @@ export interface SpanIO {
output: string;
outputObject: string | null;
toolCalls: string | null;
/**
* JSON array of pointers to oversized field payloads offloaded to object
* storage: `[{field,blob_id,size}]`. When present, the inline columns hold
* previews and the full values are fetched from storage by blob_id.
*/
blobRefs?: string;
/**
* Custom per-span metadata (raw map; reserved namespaces stripped at the API
* boundary). Optional on the internal type so existing producers/mocks need
Expand Down
30 changes: 28 additions & 2 deletions packages/prompt-core/src/webhook-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,24 @@ function setSpanInput(ctx: SpanLike, input: unknown): void {
}
}

/**
* Record the prompt's output on the span as `agentmark.output` (the key the
* normalizer reads as the output fallback). For image/speech prompts this is
* what carries the generated media (`{mimeType, base64}`) into the trace so
* the gateway can lift it to object storage at ingest — without it the
* generated media is never captured on a span.
*/
function setSpanOutput(ctx: SpanLike, output: unknown): void {
try {
ctx.setAttribute(
"agentmark.output",
typeof output === "string" ? output : JSON.stringify(output)
);
} catch {
/* tracing must never break the run */
}
}

/**
* Record the prompt's configured model as `gen_ai.request.model` on the
* prompt span. The runner reads it from frontmatter (adapter-agnostic), so
Expand Down Expand Up @@ -887,7 +905,11 @@ export class WebhookRunner<
span: ctx,
promptName: frontmatter.name,
};
return executeImage(input, ctxExec);
const exec = await executeImage(input, ctxExec);
// result is Array<{ mimeType, base64 }> — captured so the gateway can
// lift it to object storage (see utils/media-extraction.ts).
setSpanOutput(ctx, (exec as { result?: unknown }).result);
return exec;
}
);
return { ...(await result), traceId } as WebhookPromptResponse;
Expand Down Expand Up @@ -926,7 +948,11 @@ export class WebhookRunner<
span: ctx,
promptName: frontmatter.name,
};
return executeSpeech(input, ctxExec);
const exec = await executeSpeech(input, ctxExec);
// result is { mimeType, base64, format } — captured so the gateway can
// lift it to object storage (see utils/media-extraction.ts).
setSpanOutput(ctx, (exec as { result?: unknown }).result);
return exec;
}
);
return { ...(await result), traceId } as WebhookPromptResponse;
Expand Down
57 changes: 57 additions & 0 deletions packages/prompt-core/test/webhook-runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import type {
DatasetStreamChunk,
SpanLike,
ExperimentItemSpanHook,
PromptSpanHook,
} from "../src/index";
import { WebhookRunner } from "../src/webhook-runner";

Expand Down Expand Up @@ -613,6 +614,62 @@ describe("WebhookRunner — image/speech runPrompt", () => {
);
});

// ── setSpanOutput: capture the generated media on the span ──────────────────
// Without this the image/audio output is never recorded on a span, so the
// gateway has nothing to offload to object storage and the trace viewer has
// nothing to render.
function makeCapturingPromptHook() {
const attrs: Record<string, string | number> = {};
const span: SpanLike = { traceId: "", setAttribute: (k, v) => { attrs[k] = v; } };
const hook = (async <T>(_p: unknown, fn: (s: SpanLike) => Promise<T>) => {
const result = await fn(span);
return { result, traceId: span.traceId };
}) as PromptSpanHook;
return { hook, attrs };
}

it("captures the image result onto the span as agentmark.output (JSON array)", async () => {
const { hook, attrs } = makeCapturingPromptHook();
const media = [{ mimeType: "image/png", base64: "iVBOR" }];
const exec = makeMediaExecutor({ image: async () => ({ type: "image", result: media }) });
const runner = new WebhookRunner(makeClient([]), exec, { promptSpanHook: hook });

await runner.runPrompt(IMAGE_AST);

expect(attrs["agentmark.output"]).toBe(JSON.stringify(media));
expect(attrs["gen_ai.request.model"]).toBe("test"); // setSpanModel still runs
});

it("captures the speech result onto the span as agentmark.output", async () => {
const { hook, attrs } = makeCapturingPromptHook();
const audio = { mimeType: "audio/mpeg", base64: "SUQz", format: "mp3" };
const exec = makeMediaExecutor({ speech: async () => ({ type: "speech", result: audio }) });
const runner = new WebhookRunner(makeClient([]), exec, { promptSpanHook: hook });

await runner.runPrompt(SPEECH_AST);

expect(attrs["agentmark.output"]).toBe(JSON.stringify(audio));
});

it("does not break the run when setAttribute throws (tracing is best-effort)", async () => {
const span: SpanLike = {
traceId: "",
setAttribute: () => { throw new Error("span backend down"); },
};
const hook = (async <T>(_p: unknown, fn: (s: SpanLike) => Promise<T>) => {
const result = await fn(span);
return { result, traceId: span.traceId };
}) as PromptSpanHook;
const exec = makeMediaExecutor({
image: async () => ({ type: "image", result: [{ mimeType: "image/png", base64: "x" }] }),
});
const runner = new WebhookRunner(makeClient([]), exec, { promptSpanHook: hook });

// setSpanInput/Model/Output all swallow errors — the run still returns.
const res: any = await runner.runPrompt(IMAGE_AST);
expect(res.type).toBe("image");
});

it("throws 'Invalid prompt' when frontmatter declares no recognized config", async () => {
const exec = makeMediaExecutor({});
const runner = new WebhookRunner(makeClient([]), exec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export const mergeSpanIO = (
output: io.output,
outputObject: io.outputObject,
toolCalls: io.toolCalls,
blobRefs: io.blobRefs,
} as SpanData["data"],
};
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ interface UseSpanPromptsResult {
objectResponse?: any;
documents?: RetrievalDocumentView[];
} | null;
/** JSON array of offloaded field pointers from the effective (IO-hydrated) span. */
blobRefs?: string;
isLoading: boolean;
isLoadingIO: boolean;
}
Expand Down Expand Up @@ -319,6 +321,9 @@ export const useSpanPrompts = (): UseSpanPromptsResult => {
return {
prompts,
outputData,
// From the effective span so it reflects the lazily-hydrated IO (the raw
// selectedSpan has no blobRefs until merge) — drives OffloadedOutput.
blobRefs: effectiveSpan?.data?.blobRefs,
isLoading: !selectedSpan,
isLoadingIO,
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { useSpanPrompts } from "../hooks/use-span-prompts";
import { PromptList } from "./prompt-list";
import { OutputDisplay } from "./output-display";
import { OffloadedFields } from "./offloaded-fields";
import { TabPanel } from "@mui/lab";
import { Alert, Box, Skeleton } from "@mui/material";
import { useSpanInfoContext } from "../../span-info-provider";
Expand All @@ -15,7 +16,7 @@ const IOLoadingSkeleton = () => (
);

export const InputOutputTab = () => {
const { prompts, outputData, isLoadingIO } = useSpanPrompts();
const { prompts, outputData, isLoadingIO, blobRefs } = useSpanPrompts();
const { span } = useSpanInfoContext();

return (
Expand All @@ -39,6 +40,7 @@ export const InputOutputTab = () => {
<>
<PromptList prompts={prompts} />
<OutputDisplay outputData={outputData} />
{blobRefs && <OffloadedFields blobRefs={blobRefs} />}
</>
)}
{span.data.statusMessage && (
Expand Down
Loading
Loading