Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,582 changes: 797 additions & 785 deletions proto/gen/rill/runtime/v1/api.pb.go

Large diffs are not rendered by default.

29 changes: 29 additions & 0 deletions proto/gen/rill/runtime/v1/api.pb.validate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions proto/gen/rill/runtime/v1/runtime.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5445,6 +5445,8 @@ definitions:
properties:
message:
$ref: '#/definitions/v1Message'
result:
$ref: '#/definitions/v1Message'
title: Response message for RuntimeService.GetAIMessage
v1GetConversationResponse:
type: object
Expand Down
1 change: 1 addition & 0 deletions proto/rill/runtime/v1/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,7 @@ message GetAIMessageRequest {
// Response message for RuntimeService.GetAIMessage
message GetAIMessageResponse {
Message message = 1;
Message result = 2;
}

// **********
Expand Down
54 changes: 47 additions & 7 deletions runtime/ai/metrics_view_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/modelcontextprotocol/go-sdk/mcp"
"github.com/rilldata/rill/runtime"
"github.com/rilldata/rill/runtime/metricsview"
"github.com/rilldata/rill/runtime/pkg/mapstructureutil"
)

const QueryMetricsViewName = "query_metrics_view"
Expand All @@ -25,10 +26,14 @@ var _ Tool[QueryMetricsViewArgs, *QueryMetricsViewResult] = (*QueryMetricsView)(
type QueryMetricsViewArgs map[string]any

type QueryMetricsViewResult struct {
Schema []SchemaField `json:"schema"`
Data [][]any `json:"data"`
OpenURL string `json:"open_url,omitempty"`
TruncationWarning string `json:"truncation_warning,omitempty"`
Schema []SchemaField `json:"schema"`
Data [][]any `json:"data"`
// ResolvedTimeRange & ResolvedComparisonTimeRange store the exact time ranges used for the query.
// This helps when opening the citation url and get the exact time range for relative time ranges.
ResolvedTimeRange *metricsview.TimeRange `json:"resolved_time_range,omitempty"`
ResolvedComparisonTimeRange *metricsview.TimeRange `json:"resolved_comparison_time_range,omitempty"`
OpenURL string `json:"open_url,omitempty"`
TruncationWarning string `json:"truncation_warning,omitempty"`
}

func (t *QueryMetricsView) Spec() *mcp.Tool {
Expand Down Expand Up @@ -273,6 +278,12 @@ func (t *QueryMetricsView) Handler(ctx context.Context, args QueryMetricsViewArg
return nil, err
}

// Resolve time ranges and store them in the result to record the exact resolved time ranges for this tool call
tr, ctr, err := resolveTimeRanges(res)
if err != nil {
return nil, err
}
Comment on lines +281 to +285
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid duplicate logic and execution (timestamps may not always be cached), can we return the time ranges from the metrics resolver (you can use ResolverResult.Meta() map for them), and in turn push it into the executor (which already has the necessary logic, and the timestamps binding ensures no duplicate queries)?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok updated to return the resolved time ranges from metrics resolver.


// Generate an open URL for the query
openURL, err := t.generateOpenURL(ctx, session.InstanceID(), session.ID(), session.ParentID)
if err != nil {
Expand All @@ -281,9 +292,11 @@ func (t *QueryMetricsView) Handler(ctx context.Context, args QueryMetricsViewArg

// Build the result
result := &QueryMetricsViewResult{
Schema: schema,
Data: data,
OpenURL: openURL,
Schema: schema,
Data: data,
OpenURL: openURL,
ResolvedTimeRange: tr,
ResolvedComparisonTimeRange: ctr,
}
if isSystemLimit && int64(len(data)) >= limit { // Add a warning if we hit the system limit
msg := fmt.Sprintf("The system truncated the result to %d rows", limit)
Expand Down Expand Up @@ -321,3 +334,30 @@ func (t *QueryMetricsView) generateOpenURL(ctx context.Context, instanceID, sess

return openURL.String(), nil
}

func resolveTimeRanges(res runtime.ResolverResult) (*metricsview.TimeRange, *metricsview.TimeRange, error) {
meta := res.Meta()
if meta == nil {
return nil, nil, nil
}

var tr *metricsview.TimeRange
rawTr, ok := meta["time_range"]
if ok {
tr = &metricsview.TimeRange{}
if err := mapstructureutil.WeakDecode(rawTr, tr); err != nil {
return nil, nil, err
}
}

var ctr *metricsview.TimeRange
rawCtr, ok := meta["comparison_time_range"]
if ok {
ctr = &metricsview.TimeRange{}
if err := mapstructureutil.WeakDecode(rawCtr, ctr); err != nil {
return nil, nil, err
}
}

return tr, ctr, nil
}
66 changes: 66 additions & 0 deletions runtime/ai/metrics_view_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,69 @@ cache:
require.Equal(t, nil, res.Data[0][0])
require.Equal(t, nil, res.Data[0][1])
}

func TestMetricsViewQueryResolvedTimeRanges(t *testing.T) {
// Setup a metrics view with a time dimension. The watermark defaults to the max event_time, i.e. 2025-05-13T00:00:00Z.
rt, instanceID := testruntime.NewInstanceWithOptions(t, testruntime.InstanceOptions{
Files: map[string]string{
"test_data.sql": `
SELECT '2025-05-10T00:00:00Z'::TIMESTAMP AS event_time, 'US' AS country, 100 AS revenue
UNION ALL
SELECT '2025-05-11T00:00:00Z'::TIMESTAMP AS event_time, 'US' AS country, 200 AS revenue
UNION ALL
SELECT '2025-05-12T00:00:00Z'::TIMESTAMP AS event_time, 'US' AS country, 300 AS revenue
UNION ALL
SELECT '2025-05-13T00:00:00Z'::TIMESTAMP AS event_time, 'US' AS country, 400 AS revenue
`,
"test_metrics.yaml": `
type: metrics_view
model: test_data
timeseries: event_time
dimensions:
- column: country
measures:
- name: total_revenue
expression: SUM(revenue)
explore:
skip: true
`,
},
Variables: map[string]string{
"rill.ai.require_time_range": "false",
},
})
testruntime.RequireReconcileState(t, rt, instanceID, 3, 0, 0)

s := newSession(t, rt, instanceID)

baseArgs := func() ai.QueryMetricsViewArgs {
return ai.QueryMetricsViewArgs{
"metrics_view": "test_metrics",
"dimensions": []map[string]any{{"name": "country"}},
"measures": []map[string]any{{"name": "total_revenue"}},
}
}

t.Run("no time range", func(t *testing.T) {
var res *ai.QueryMetricsViewResult
_, err := s.CallTool(t.Context(), ai.RoleUser, ai.QueryMetricsViewName, &res, baseArgs())
require.NoError(t, err)
require.Nil(t, res.ResolvedTimeRange)
require.Nil(t, res.ResolvedComparisonTimeRange)
})

t.Run("expression time range and comparison", func(t *testing.T) {
args := baseArgs()
args["time_range"] = map[string]any{"expression": "1D as of watermark/D"}
args["comparison_time_range"] = map[string]any{"expression": "1D as of watermark/D offset -1D"}
var res *ai.QueryMetricsViewResult
_, err := s.CallTool(t.Context(), ai.RoleUser, ai.QueryMetricsViewName, &res, args)
require.NoError(t, err)
require.NotNil(t, res.ResolvedTimeRange)
require.Equal(t, parseTestTime(t, "2025-05-12T00:00:00Z"), res.ResolvedTimeRange.Start)
require.Equal(t, parseTestTime(t, "2025-05-13T00:00:00Z"), res.ResolvedTimeRange.End)
require.NotNil(t, res.ResolvedComparisonTimeRange)
require.Equal(t, parseTestTime(t, "2025-05-11T00:00:00Z"), res.ResolvedComparisonTimeRange.Start)
require.Equal(t, parseTestTime(t, "2025-05-12T00:00:00Z"), res.ResolvedComparisonTimeRange.End)
})
}
10 changes: 5 additions & 5 deletions runtime/metricsview/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,11 @@ type Sort struct {
type TimeRange struct {
Start time.Time `json:"start" mapstructure:"start"`
End time.Time `json:"end" mapstructure:"end"`
Expression string `json:"expression" mapstructure:"expression"`
IsoDuration string `json:"iso_duration" mapstructure:"iso_duration"`
IsoOffset string `json:"iso_offset" mapstructure:"iso_offset"`
RoundToGrain TimeGrain `json:"round_to_grain" mapstructure:"round_to_grain"`
TimeDimension string `json:"time_dimension" mapstructure:"time_dimension"` // optional time dimension to use for time-based operations, if not specified, the default time dimension in the metrics view is used
Expression string `json:"expression,omitempty" mapstructure:"expression"`
IsoDuration string `json:"iso_duration,omitempty" mapstructure:"iso_duration"`
IsoOffset string `json:"iso_offset,omitempty" mapstructure:"iso_offset"`
RoundToGrain TimeGrain `json:"round_to_grain,omitempty" mapstructure:"round_to_grain"`
TimeDimension string `json:"time_dimension,omitempty" mapstructure:"time_dimension"` // optional time dimension to use for time-based operations, if not specified, the default time dimension in the metrics view is used
}

func (tr *TimeRange) IsZero() bool {
Expand Down
14 changes: 14 additions & 0 deletions runtime/resolvers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,20 @@ func (r *metricsResolver) ResolveInteractive(ctx context.Context) (runtime.Resol
return nil, err
}

// Add resolved time ranges to metadata
if r.query.TimeRange != nil {
meta["time_range"] = &metricsview.TimeRange{
Start: r.query.TimeRange.Start,
End: r.query.TimeRange.End,
}
}
if r.query.ComparisonTimeRange != nil {
meta["comparison_time_range"] = &metricsview.TimeRange{
Start: r.query.ComparisonTimeRange.Start,
End: r.query.ComparisonTimeRange.End,
}
}

return runtime.NewDriverResolverResult(res, meta), nil
}

Expand Down
11 changes: 11 additions & 0 deletions runtime/server/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,8 +572,19 @@ func (s *Server) GetAIMessage(ctx context.Context, req *runtimev1.GetAIMessageRe
return nil, status.Errorf(codes.Internal, "failed to convert message to protobuf: %v", err)
}

var resPbMsg *runtimev1.Message
resMsg, ok := session.Message(ai.FilterByParent(req.MessageId), ai.FilterByType(ai.MessageTypeResult))
// Don't throw if there is no result message.
if ok {
resPbMsg, err = messageToPB(session, resMsg)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to convert message to protobuf: %v", err)
}
}

return &runtimev1.GetAIMessageResponse{
Message: pbMsg,
Result: resPbMsg,
}, nil
}

Expand Down
1 change: 1 addition & 0 deletions runtime/server/chat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ measures:
})
require.NoError(t, err)
require.Equal(t, res1.Messages[0], msgRes.Message)
require.Equal(t, res1.Messages[5], msgRes.Result)

// Check it errors if completing a conversation that doesn't exist
_, err = srv.Complete(fooCtx, &runtimev1.CompleteRequest{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@ import { fetchMessage } from "@rilldata/web-common/features/chat/core/citation-u
export async function load({ params: { conversationId, messageId }, parent }) {
const { runtime } = await parent();

const message = await fetchMessage(runtime, conversationId, messageId);
const { message, result } = await fetchMessage(
runtime,
conversationId,
messageId,
);

return {
message,
result,
};
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import { maybeGetMetricsResolverQueryFromMessage } from "@rilldata/web-common/features/chat/core/citation-url-utils.ts";
import {
getResolvedTimeRangesFromMessage,
maybeGetMetricsResolverQueryFromMessage,
} from "@rilldata/web-common/features/chat/core/citation-url-utils.ts";
import { openQuery } from "@rilldata/web-common/features/explore-mappers/open-query.ts";
import { getCloudRuntimeClient } from "@rilldata/web-admin/lib/runtime-client";

export async function load({ parent, params: { organization, project } }) {
const { runtime, message } = await parent();
const { runtime, message, result } = await parent();
const client = getCloudRuntimeClient(runtime);

const query = maybeGetMetricsResolverQueryFromMessage(message);
const resolvedTimeRanges = getResolvedTimeRangesFromMessage(result);

await openQuery({
query,
mapArgs: { query, ...resolvedTimeRanges },
client,
organization,
project,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import { getQueryFromUrl } from "@rilldata/web-common/features/chat/core/citatio

export const load: PageLoad = async ({ params, url, parent }) => {
const { runtime } = await parent();
const client = getCloudRuntimeClient(runtime!);
const client = getCloudRuntimeClient(runtime);

const query = getQueryFromUrl(url);

await openQuery({
query,
mapArgs: { query },
organization: params.organization,
project: params.project,
client,
Expand Down
29 changes: 27 additions & 2 deletions web-common/src/features/chat/core/citation-url-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ import {
type V1GetAIMessageResponse,
type V1Message,
} from "@rilldata/web-common/runtime-client";
import type { Schema as MetricsResolverQuery } from "@rilldata/web-common/runtime-client/gen/resolvers/metrics/schema.ts";
import type {
Schema as MetricsResolverQuery,
TimeRange,
} from "@rilldata/web-common/runtime-client/gen/resolvers/metrics/schema.ts";
import {
MessageType,
ToolName,
Expand Down Expand Up @@ -36,7 +39,10 @@ export async function fetchMessage(
const toolCallResp = (await resp.json()) as V1GetAIMessageResponse;

// 200 response should always have a message.
return toolCallResp.message!;
return {
message: toolCallResp.message!,
result: toolCallResp.result,
};
} catch (e) {
const apiError = e.response?.data?.message;
// Rethrow api error. Top level message will just be InternalError
Expand Down Expand Up @@ -71,6 +77,25 @@ export function maybeGetMetricsResolverQueryFromMessage(message: V1Message) {
return rawJson as MetricsResolverQuery;
}

export function getResolvedTimeRangesFromMessage(
resultMessage: V1Message | undefined,
): {
resolvedTimeRange?: TimeRange;
resolvedComparisonTimeRange?: TimeRange;
} {
if (!resultMessage?.contentData) return {};
try {
const resultData = JSON.parse(resultMessage.contentData);
return {
resolvedTimeRange: resultData.resolved_time_range,
resolvedComparisonTimeRange: resultData.resolved_comparison_time_range,
};
} catch (e) {
console.error("Failed to parse result message JSON", e);
}
return {};
}

const closingRoundBracketRegex = /\)$/;
const closingCurlyBracketRegex = /}$/;

Expand Down
Loading
Loading