Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .chloggen/telemetrygen-span-links.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: telemetrygen

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add span links support to telemetrygen

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [43007]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The new --span-links flag allows generating spans with links to previously created spans.
Each span can link to random existing span contexts, creating relationships between spans for testing
distributed tracing scenarios. Links include attributes for link type and index identification.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
3 changes: 3 additions & 0 deletions cmd/telemetrygen/pkg/traces/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Config struct {
PropagateContext bool
StatusCode string
Batch bool
NumSpanLinks int

SpanDuration time.Duration
}
Expand All @@ -42,6 +43,7 @@ func (c *Config) Flags(fs *pflag.FlagSet) {
fs.BoolVar(&c.PropagateContext, "marshal", c.PropagateContext, "Whether to marshal trace context via HTTP headers")
fs.StringVar(&c.StatusCode, "status-code", c.StatusCode, "Status code to use for the spans, one of (Unset, Error, Ok) or the equivalent integer (0,1,2)")
fs.BoolVar(&c.Batch, "batch", c.Batch, "Whether to batch traces")
fs.IntVar(&c.NumSpanLinks, "span-links", c.NumSpanLinks, "Number of span links to generate for each span")
fs.DurationVar(&c.SpanDuration, "span-duration", c.SpanDuration, "The duration of each generated span.")
}

Expand All @@ -57,6 +59,7 @@ func (c *Config) SetDefaults() {
c.PropagateContext = false
c.StatusCode = "0"
c.Batch = true
c.NumSpanLinks = 0
c.SpanDuration = 123 * time.Microsecond
}

Expand Down
3 changes: 3 additions & 0 deletions cmd/telemetrygen/pkg/traces/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"golang.org/x/time/rate"

Expand Down Expand Up @@ -153,6 +154,8 @@ func run(c *Config, logger *zap.Logger) error {
loadSize: c.LoadSize,
spanDuration: c.SpanDuration,
allowFailures: c.AllowExportFailures,
numSpanLinks: c.NumSpanLinks,
spanContexts: make([]trace.SpanContext, 0),
}

go w.simulateTraces(telemetryAttributes)
Expand Down
138 changes: 138 additions & 0 deletions cmd/telemetrygen/pkg/traces/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@ package traces

import (
"encoding/pem"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"os"
"path/filepath"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/trace"
tracepb "go.opentelemetry.io/proto/otlp/trace/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/common"
Expand Down Expand Up @@ -227,6 +230,18 @@ func TestConfigValidation(t *testing.T) {
expectError: true,
description: "Config with negative traces should be invalid",
},
{
name: "Valid config with span links",
config: Config{
Config: common.Config{
WorkerCount: 1,
},
NumTraces: 5,
NumSpanLinks: 2,
},
expectError: false,
description: "Config with span links should be valid",
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -411,3 +426,126 @@ func TestHTTPExporterOptions_HTTP(t *testing.T) {
})
}
}

// TestSpanLinksGeneration tests the span links generation functionality
func TestSpanLinksGeneration(t *testing.T) {
tests := []struct {
name string
numSpanLinks int
existingContexts int
expectedLinkCount int
description string
}{
{
name: "No span links",
numSpanLinks: 0,
existingContexts: 5,
expectedLinkCount: 0,
description: "Should generate no links when numSpanLinks is 0",
},
{
name: "With existing contexts",
numSpanLinks: 3,
existingContexts: 5,
expectedLinkCount: 3,
description: "Should generate links to random existing contexts",
},
{
name: "No existing contexts",
numSpanLinks: 3,
existingContexts: 0,
expectedLinkCount: 0,
description: "Should generate no links when no existing contexts",
},
{
name: "Fewer contexts than requested links",
numSpanLinks: 5,
existingContexts: 3,
expectedLinkCount: 5,
description: "Should generate requested number of links (allows duplicates)",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := &worker{
numSpanLinks: tt.numSpanLinks,
spanContexts: make([]trace.SpanContext, 0),
spanContextsMu: sync.RWMutex{},
}

// Add existing contexts for testing
for i := 0; i < tt.existingContexts; i++ {
traceID, _ := trace.TraceIDFromHex(fmt.Sprintf("%032d", i))
spanID, _ := trace.SpanIDFromHex(fmt.Sprintf("%016d", i))
spanCtx := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: traceID,
SpanID: spanID,
TraceFlags: trace.FlagsSampled,
})
w.addSpanContext(spanCtx)
}

links := w.generateSpanLinks()

assert.Len(t, links, tt.expectedLinkCount, tt.description)

// Verify all links have random type and correct index
for i, link := range links {
// Verify link.type attribute is 'random'
found := false
for _, attr := range link.Attributes {
if attr.Key == "link.type" && attr.Value.AsString() == "random" {
found = true
break
}
}
assert.True(t, found, "Link should have 'link.type=random' attribute")

// Verify link.index attribute is present
foundIndex := false
for _, attr := range link.Attributes {
if attr.Key == "link.index" && attr.Value.AsInt64() == int64(i) {
foundIndex = true
break
}
}
assert.True(t, foundIndex, "Link should have correct 'link.index' attribute")
}
})
}
}

// TestDefaultSpanLinksConfiguration tests that the default span links configuration is correct
func TestDefaultSpanLinksConfiguration(t *testing.T) {
cfg := NewConfig()

assert.Equal(t, 0, cfg.NumSpanLinks, "Default NumSpanLinks should be 0")
}

func TestSpanContextsBufferLimit(t *testing.T) {
w := &worker{
numSpanLinks: 2,
spanContexts: make([]trace.SpanContext, 0),
spanContextsMu: sync.RWMutex{},
}

// Add more span contexts than the buffer limit
for i := 0; i < 1200; i++ {
traceID, _ := trace.TraceIDFromHex(fmt.Sprintf("%032d", i))
spanID, _ := trace.SpanIDFromHex(fmt.Sprintf("%016d", i))
spanCtx := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: traceID,
SpanID: spanID,
TraceFlags: trace.FlagsSampled,
})
w.addSpanContext(spanCtx)
}

// Verify the buffer doesn't exceed the maximum size
assert.LessOrEqual(t, len(w.spanContexts), 1000, "Span contexts buffer should not exceed maximum size")

// Verify we can still generate links with the buffered contexts
links := w.generateSpanLinks()
assert.Len(t, links, 2, "Should generate correct number of links even with buffer limit")
}
72 changes: 69 additions & 3 deletions cmd/telemetrygen/pkg/traces/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package traces // import "github.com/open-telemetry/opentelemetry-collector-cont
import (
"context"
"fmt"
"math/rand/v2"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -35,15 +36,66 @@ type worker struct {
wg *sync.WaitGroup // notify when done
loadSize int // desired minimum size in MB of string data for each generated trace
spanDuration time.Duration // duration of generated spans
numSpanLinks int // number of span links to generate per span
logger *zap.Logger
allowFailures bool // whether to continue on export failures
allowFailures bool // whether to continue on export failures
spanContexts []trace.SpanContext // collection of span contexts for linking
spanContextsMu sync.RWMutex // mutex for spanContexts slice
}

const (
fakeIP string = "1.2.3.4"
fakeIP string = "1.2.3.4"
maxSpanContextsBuffer int = 1000 // Maximum number of span contexts to keep for linking
)

func (w worker) simulateTraces(telemetryAttributes []attribute.KeyValue) {
// addSpanContext safely adds a span context to the worker's collection
// Maintains a circular buffer to prevent unbounded memory growth
func (w *worker) addSpanContext(spanCtx trace.SpanContext) {
w.spanContextsMu.Lock()
defer w.spanContextsMu.Unlock()

w.spanContexts = append(w.spanContexts, spanCtx)

// Keep only the most recent span contexts to prevent memory growth
if len(w.spanContexts) > maxSpanContextsBuffer {
w.spanContexts = w.spanContexts[1 : maxSpanContextsBuffer+1]
}
}

// generateSpanLinks creates span links to random existing span contexts
func (w *worker) generateSpanLinks() []trace.Link {
if w.numSpanLinks <= 0 {
return nil
}

w.spanContextsMu.RLock()
defer w.spanContextsMu.RUnlock()

availableContexts := len(w.spanContexts)
if availableContexts == 0 {
return nil
}

links := make([]trace.Link, 0, w.numSpanLinks)

// Generate links to random existing span contexts
for i := 0; i < w.numSpanLinks; i++ {
randomIndex := rand.IntN(availableContexts)
spanCtx := w.spanContexts[randomIndex]

links = append(links, trace.Link{
SpanContext: spanCtx,
Attributes: []attribute.KeyValue{
attribute.String("link.type", "random"),
attribute.Int("link.index", i),
},
})
}

return links
}

func (w *worker) simulateTraces(telemetryAttributes []attribute.KeyValue) {
tracer := otel.Tracer("telemetrygen")
limiter := rate.NewLimiter(w.limitPerSecond, 1)
var i int
Expand All @@ -56,18 +108,25 @@ func (w worker) simulateTraces(telemetryAttributes []attribute.KeyValue) {
w.logger.Fatal("limiter waited failed, retry", zap.Error(err))
}

// Generate span links for the parent span
parentLinks := w.generateSpanLinks()

ctx, sp := tracer.Start(context.Background(), "lets-go", trace.WithAttributes(
semconv.NetworkPeerAddress(fakeIP),
semconv.PeerService("telemetrygen-server"),
),
trace.WithSpanKind(trace.SpanKindClient),
trace.WithTimestamp(spanStart),
trace.WithLinks(parentLinks...),
)
sp.SetAttributes(telemetryAttributes...)
for j := 0; j < w.loadSize; j++ {
sp.SetAttributes(common.CreateLoadAttribute(fmt.Sprintf("load-%v", j), 1))
}

// Store the parent span context for potential future linking
w.addSpanContext(sp.SpanContext())

childCtx := ctx
if w.propagateContext {
header := propagation.HeaderCarrier{}
Expand All @@ -84,15 +143,22 @@ func (w worker) simulateTraces(telemetryAttributes []attribute.KeyValue) {
w.logger.Fatal("limiter waited failed, retry", zap.Error(err))
}

// Generate span links for child spans
childLinks := w.generateSpanLinks()

_, child := tracer.Start(childCtx, "okey-dokey-"+strconv.Itoa(j), trace.WithAttributes(
semconv.NetworkPeerAddress(fakeIP),
semconv.PeerService("telemetrygen-client"),
),
trace.WithSpanKind(trace.SpanKindServer),
trace.WithTimestamp(spanStart),
trace.WithLinks(childLinks...),
)
child.SetAttributes(telemetryAttributes...)

// Store the child span context for potential future linking
w.addSpanContext(child.SpanContext())

endTimestamp = trace.WithTimestamp(spanEnd)
child.SetStatus(w.statusCode, "")
child.End(endTimestamp)
Expand Down
Loading