Skip to content

Commit bc0939c

Browse files
committed
Add span links support to telemetrygen
Assisted by Claude code using Gemini API.
1 parent 30367f0 commit bc0939c

File tree

5 files changed

+244
-3
lines changed

5 files changed

+244
-3
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: telemetrygen
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add span links support to telemetrygen
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [43007]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
The new --span-links flag allows generating spans with links to previously created spans.
20+
Each span can link to random existing span contexts, creating relationships between spans for testing
21+
distributed tracing scenarios. Links include attributes for link type and index identification.
22+
23+
# If your change doesn't affect end users or the exported elements of any package,
24+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
25+
# Optional: The change log or logs in which this entry should be included.
26+
# e.g. '[user]' or '[user, api]'
27+
# Include 'user' if the change is relevant to end users.
28+
# Include 'api' if there is a change to a library API.
29+
# Default: '[user]'
30+
change_logs: [user]

cmd/telemetrygen/pkg/traces/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type Config struct {
2121
PropagateContext bool
2222
StatusCode string
2323
Batch bool
24+
NumSpanLinks int
2425

2526
SpanDuration time.Duration
2627
}
@@ -42,6 +43,7 @@ func (c *Config) Flags(fs *pflag.FlagSet) {
4243
fs.BoolVar(&c.PropagateContext, "marshal", c.PropagateContext, "Whether to marshal trace context via HTTP headers")
4344
fs.StringVar(&c.StatusCode, "status-code", c.StatusCode, "Status code to use for the spans, one of (Unset, Error, Ok) or the equivalent integer (0,1,2)")
4445
fs.BoolVar(&c.Batch, "batch", c.Batch, "Whether to batch traces")
46+
fs.IntVar(&c.NumSpanLinks, "span-links", c.NumSpanLinks, "Number of span links to generate for each span")
4547
fs.DurationVar(&c.SpanDuration, "span-duration", c.SpanDuration, "The duration of each generated span.")
4648
}
4749

@@ -57,6 +59,7 @@ func (c *Config) SetDefaults() {
5759
c.PropagateContext = false
5860
c.StatusCode = "0"
5961
c.Batch = true
62+
c.NumSpanLinks = 0
6063
c.SpanDuration = 123 * time.Microsecond
6164
}
6265

cmd/telemetrygen/pkg/traces/traces.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"go.opentelemetry.io/otel/sdk/resource"
2222
sdktrace "go.opentelemetry.io/otel/sdk/trace"
2323
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
24+
"go.opentelemetry.io/otel/trace"
2425
"go.uber.org/zap"
2526
"golang.org/x/time/rate"
2627

@@ -153,6 +154,8 @@ func run(c *Config, logger *zap.Logger) error {
153154
loadSize: c.LoadSize,
154155
spanDuration: c.SpanDuration,
155156
allowFailures: c.AllowExportFailures,
157+
numSpanLinks: c.NumSpanLinks,
158+
spanContexts: make([]trace.SpanContext, 0),
156159
}
157160

158161
go w.simulateTraces(telemetryAttributes)

cmd/telemetrygen/pkg/traces/traces_test.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,20 @@ package traces
55

66
import (
77
"encoding/pem"
8+
"fmt"
89
"net/http"
910
"net/http/httptest"
1011
"net/url"
1112
"os"
1213
"path/filepath"
14+
"sync"
1315
"testing"
1416
"time"
1517

1618
"github.com/stretchr/testify/assert"
1719
"github.com/stretchr/testify/require"
1820
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
21+
"go.opentelemetry.io/otel/trace"
1922
tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
2023

2124
"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/common"
@@ -227,6 +230,18 @@ func TestConfigValidation(t *testing.T) {
227230
expectError: true,
228231
description: "Config with negative traces should be invalid",
229232
},
233+
{
234+
name: "Valid config with span links",
235+
config: Config{
236+
Config: common.Config{
237+
WorkerCount: 1,
238+
},
239+
NumTraces: 5,
240+
NumSpanLinks: 2,
241+
},
242+
expectError: false,
243+
description: "Config with span links should be valid",
244+
},
230245
}
231246

232247
for _, tt := range tests {
@@ -411,3 +426,126 @@ func TestHTTPExporterOptions_HTTP(t *testing.T) {
411426
})
412427
}
413428
}
429+
430+
// TestSpanLinksGeneration tests the span links generation functionality
431+
func TestSpanLinksGeneration(t *testing.T) {
432+
tests := []struct {
433+
name string
434+
numSpanLinks int
435+
existingContexts int
436+
expectedLinkCount int
437+
description string
438+
}{
439+
{
440+
name: "No span links",
441+
numSpanLinks: 0,
442+
existingContexts: 5,
443+
expectedLinkCount: 0,
444+
description: "Should generate no links when numSpanLinks is 0",
445+
},
446+
{
447+
name: "With existing contexts",
448+
numSpanLinks: 3,
449+
existingContexts: 5,
450+
expectedLinkCount: 3,
451+
description: "Should generate links to random existing contexts",
452+
},
453+
{
454+
name: "No existing contexts",
455+
numSpanLinks: 3,
456+
existingContexts: 0,
457+
expectedLinkCount: 0,
458+
description: "Should generate no links when no existing contexts",
459+
},
460+
{
461+
name: "Fewer contexts than requested links",
462+
numSpanLinks: 5,
463+
existingContexts: 3,
464+
expectedLinkCount: 3,
465+
description: "Should generate links up to the number of available contexts",
466+
},
467+
}
468+
469+
for _, tt := range tests {
470+
t.Run(tt.name, func(t *testing.T) {
471+
w := &worker{
472+
numSpanLinks: tt.numSpanLinks,
473+
spanContexts: make([]trace.SpanContext, 0),
474+
spanContextsMu: sync.RWMutex{},
475+
}
476+
477+
// Add existing contexts for testing
478+
for i := 0; i < tt.existingContexts; i++ {
479+
traceID, _ := trace.TraceIDFromHex(fmt.Sprintf("%032d", i))
480+
spanID, _ := trace.SpanIDFromHex(fmt.Sprintf("%016d", i))
481+
spanCtx := trace.NewSpanContext(trace.SpanContextConfig{
482+
TraceID: traceID,
483+
SpanID: spanID,
484+
TraceFlags: trace.FlagsSampled,
485+
})
486+
w.addSpanContext(spanCtx)
487+
}
488+
489+
links := w.generateSpanLinks()
490+
491+
assert.Len(t, links, tt.expectedLinkCount, tt.description)
492+
493+
// Verify all links have random type and correct index
494+
for i, link := range links {
495+
// Verify link.type attribute is 'random'
496+
found := false
497+
for _, attr := range link.Attributes {
498+
if attr.Key == "link.type" && attr.Value.AsString() == "random" {
499+
found = true
500+
break
501+
}
502+
}
503+
assert.True(t, found, "Link should have 'link.type=random' attribute")
504+
505+
// Verify link.index attribute is present
506+
foundIndex := false
507+
for _, attr := range link.Attributes {
508+
if attr.Key == "link.index" && attr.Value.AsInt64() == int64(i) {
509+
foundIndex = true
510+
break
511+
}
512+
}
513+
assert.True(t, foundIndex, "Link should have correct 'link.index' attribute")
514+
}
515+
})
516+
}
517+
}
518+
519+
// TestDefaultSpanLinksConfiguration tests that the default span links configuration is correct
520+
func TestDefaultSpanLinksConfiguration(t *testing.T) {
521+
cfg := NewConfig()
522+
523+
assert.Equal(t, 0, cfg.NumSpanLinks, "Default NumSpanLinks should be 0")
524+
}
525+
526+
func TestSpanContextsBufferLimit(t *testing.T) {
527+
w := &worker{
528+
numSpanLinks: 2,
529+
spanContexts: make([]trace.SpanContext, 0),
530+
spanContextsMu: sync.RWMutex{},
531+
}
532+
533+
// Add more span contexts than the buffer limit
534+
for i := 0; i < 1200; i++ {
535+
traceID, _ := trace.TraceIDFromHex(fmt.Sprintf("%032d", i))
536+
spanID, _ := trace.SpanIDFromHex(fmt.Sprintf("%016d", i))
537+
spanCtx := trace.NewSpanContext(trace.SpanContextConfig{
538+
TraceID: traceID,
539+
SpanID: spanID,
540+
TraceFlags: trace.FlagsSampled,
541+
})
542+
w.addSpanContext(spanCtx)
543+
}
544+
545+
// Verify the buffer doesn't exceed the maximum size
546+
assert.LessOrEqual(t, len(w.spanContexts), 1000, "Span contexts buffer should not exceed maximum size")
547+
548+
// Verify we can still generate links with the buffered contexts
549+
links := w.generateSpanLinks()
550+
assert.Len(t, links, 2, "Should generate correct number of links even with buffer limit")
551+
}

cmd/telemetrygen/pkg/traces/worker.go

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package traces // import "github.com/open-telemetry/opentelemetry-collector-cont
66
import (
77
"context"
88
"fmt"
9+
"math/rand/v2"
910
"strconv"
1011
"sync"
1112
"sync/atomic"
@@ -35,15 +36,67 @@ type worker struct {
3536
wg *sync.WaitGroup // notify when done
3637
loadSize int // desired minimum size in MB of string data for each generated trace
3738
spanDuration time.Duration // duration of generated spans
39+
numSpanLinks int // number of span links to generate per span
3840
logger *zap.Logger
39-
allowFailures bool // whether to continue on export failures
41+
allowFailures bool // whether to continue on export failures
42+
spanContexts []trace.SpanContext // collection of span contexts for linking
43+
spanContextsMu sync.RWMutex // mutex for spanContexts slice
4044
}
4145

4246
const (
43-
fakeIP string = "1.2.3.4"
47+
fakeIP string = "1.2.3.4"
48+
maxSpanContextsBuffer int = 1000 // Maximum number of span contexts to keep for linking
4449
)
4550

46-
func (w worker) simulateTraces(telemetryAttributes []attribute.KeyValue) {
51+
// addSpanContext safely adds a span context to the worker's collection
52+
// Maintains a circular buffer to prevent unbounded memory growth
53+
func (w *worker) addSpanContext(spanCtx trace.SpanContext) {
54+
w.spanContextsMu.Lock()
55+
defer w.spanContextsMu.Unlock()
56+
57+
w.spanContexts = append(w.spanContexts, spanCtx)
58+
59+
// Keep only the most recent span contexts to prevent memory growth
60+
if len(w.spanContexts) > maxSpanContextsBuffer {
61+
copy(w.spanContexts, w.spanContexts[len(w.spanContexts)-maxSpanContextsBuffer:])
62+
w.spanContexts = w.spanContexts[:maxSpanContextsBuffer]
63+
}
64+
}
65+
66+
// generateSpanLinks creates span links to random existing span contexts
67+
func (w *worker) generateSpanLinks() []trace.Link {
68+
if w.numSpanLinks <= 0 {
69+
return nil
70+
}
71+
72+
w.spanContextsMu.RLock()
73+
defer w.spanContextsMu.RUnlock()
74+
75+
availableContexts := len(w.spanContexts)
76+
if availableContexts == 0 {
77+
return nil
78+
}
79+
80+
links := make([]trace.Link, 0, w.numSpanLinks)
81+
82+
// Generate links to random existing span contexts
83+
for i := 0; i < w.numSpanLinks && i < availableContexts; i++ {
84+
randomIndex := rand.IntN(availableContexts)
85+
spanCtx := w.spanContexts[randomIndex]
86+
87+
links = append(links, trace.Link{
88+
SpanContext: spanCtx,
89+
Attributes: []attribute.KeyValue{
90+
attribute.String("link.type", "random"),
91+
attribute.Int("link.index", i),
92+
},
93+
})
94+
}
95+
96+
return links
97+
}
98+
99+
func (w *worker) simulateTraces(telemetryAttributes []attribute.KeyValue) {
47100
tracer := otel.Tracer("telemetrygen")
48101
limiter := rate.NewLimiter(w.limitPerSecond, 1)
49102
var i int
@@ -56,18 +109,25 @@ func (w worker) simulateTraces(telemetryAttributes []attribute.KeyValue) {
56109
w.logger.Fatal("limiter waited failed, retry", zap.Error(err))
57110
}
58111

112+
// Generate span links for the parent span
113+
parentLinks := w.generateSpanLinks()
114+
59115
ctx, sp := tracer.Start(context.Background(), "lets-go", trace.WithAttributes(
60116
semconv.NetworkPeerAddress(fakeIP),
61117
semconv.PeerService("telemetrygen-server"),
62118
),
63119
trace.WithSpanKind(trace.SpanKindClient),
64120
trace.WithTimestamp(spanStart),
121+
trace.WithLinks(parentLinks...),
65122
)
66123
sp.SetAttributes(telemetryAttributes...)
67124
for j := 0; j < w.loadSize; j++ {
68125
sp.SetAttributes(common.CreateLoadAttribute(fmt.Sprintf("load-%v", j), 1))
69126
}
70127

128+
// Store the parent span context for potential future linking
129+
w.addSpanContext(sp.SpanContext())
130+
71131
childCtx := ctx
72132
if w.propagateContext {
73133
header := propagation.HeaderCarrier{}
@@ -84,15 +144,22 @@ func (w worker) simulateTraces(telemetryAttributes []attribute.KeyValue) {
84144
w.logger.Fatal("limiter waited failed, retry", zap.Error(err))
85145
}
86146

147+
// Generate span links for child spans
148+
childLinks := w.generateSpanLinks()
149+
87150
_, child := tracer.Start(childCtx, "okey-dokey-"+strconv.Itoa(j), trace.WithAttributes(
88151
semconv.NetworkPeerAddress(fakeIP),
89152
semconv.PeerService("telemetrygen-client"),
90153
),
91154
trace.WithSpanKind(trace.SpanKindServer),
92155
trace.WithTimestamp(spanStart),
156+
trace.WithLinks(childLinks...),
93157
)
94158
child.SetAttributes(telemetryAttributes...)
95159

160+
// Store the child span context for potential future linking
161+
w.addSpanContext(child.SpanContext())
162+
96163
endTimestamp = trace.WithTimestamp(spanEnd)
97164
child.SetStatus(w.statusCode, "")
98165
child.End(endTimestamp)

0 commit comments

Comments
 (0)