Skip to content
38 changes: 33 additions & 5 deletions libdd-data-pipeline/src/stats_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,22 +159,26 @@ fn encode_stats_payload(
) -> pb::ClientStatsPayload {
pb::ClientStatsPayload {
hostname: meta.hostname.clone(),
env: meta.env.clone(),
lang: meta.language.clone(),
env: if meta.env.is_empty() {
"unknown-env".to_string()
} else {
meta.env.clone()
},
version: meta.app_version.clone(),
runtime_id: meta.runtime_id.clone(),
tracer_version: meta.tracer_version.clone(),
sequence,
service: meta.service.clone(),
stats: buckets,
git_commit_sha: meta.git_commit_sha.clone(),
process_tags: meta.process_tags.clone(),
// These fields are unused or will be set by the Agent
service: String::new(),
// These fields will be set by the Agent
container_id: String::new(),
tags: Vec::new(),
agent_aggregation: String::new(),
image_tag: String::new(),
process_tags_hash: 0,
lang: String::new(),
tracer_version: String::new(),
}
}

Expand Down Expand Up @@ -387,4 +391,28 @@ mod tests {
"Expected max retry attempts"
);
}

#[test]
fn test_encode_stats_payload_defaults_empty_env() {
// Test that empty env defaults to "unknown-env"
let mut meta_with_empty_env = get_test_metadata();
meta_with_empty_env.env = "".to_string();

let buckets = vec![];
let payload = encode_stats_payload(&meta_with_empty_env, 1, buckets.clone());

assert_eq!(
payload.env, "unknown-env",
"Empty env should default to 'unknown-env'"
);

// Test that non-empty env is preserved
let meta_with_env = get_test_metadata();
let payload_with_env = encode_stats_payload(&meta_with_env, 2, buckets);

assert_eq!(
payload_with_env.env, "test",
"Non-empty env should be preserved"
);
}
}
11 changes: 7 additions & 4 deletions libdd-trace-stats/src/span_concentrator/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,16 @@ impl<'a> BorrowedAggregationKey<'a> {
/// key.
pub(super) fn from_span<T: StatSpan<'a>>(span: &'a T, peer_tag_keys: &'a [String]) -> Self {
let span_kind = span.get_meta(TAG_SPANKIND).unwrap_or_default();
let peer_tags = if client_or_producer(span_kind) {
let peer_tags = if should_track_peer_tags(span_kind) {
// Parse the meta tags of the span and return a list of the peer tags based on the list
// of `peer_tag_keys`
peer_tag_keys
.iter()
.filter_map(|key| Some(((key.as_str()), (span.get_meta(key.as_str())?))))
.collect()
} else if let Some(base_service) = span.get_meta("_dd.base_service") {
// Internal spans with a base service override use only _dd.base_service as peer tag
vec![("_dd.base_service", base_service)]
} else {
vec![]
};
Expand Down Expand Up @@ -279,14 +282,14 @@ impl From<pb::ClientGroupedStats> for OwnedAggregationKey {
}
}

/// Return true if the span kind is "client" or "producer"
fn client_or_producer<T>(span_kind: T) -> bool
/// Return true if we care about peer tags on the span
fn should_track_peer_tags<T>(span_kind: T) -> bool
where
T: SpanText,
{
matches!(
span_kind.borrow().to_lowercase().as_str(),
"client" | "producer"
"client" | "producer" | "consumer"
)
}

Expand Down
167 changes: 167 additions & 0 deletions libdd-trace-stats/src/span_concentrator/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,173 @@ fn test_peer_tags_aggregation() {
);
}

/// Test that internal spans with _dd.base_service use it as their sole peer tag
#[test]
fn test_base_service_peer_tag() {
let now = SystemTime::now();
let mut spans = vec![
// Regular internal span without base_service (no peer tags)
get_test_span_with_meta(
now,
1,
0,
100,
5,
"A1",
"internal.operation",
0,
&[],
&[("_dd.measured", 1.0)],
),
// Internal span with _dd.base_service (should have base_service as peer tag)
get_test_span_with_meta(
now,
2,
0,
75,
5,
"A1",
"internal.with.base.service",
0,
&[("_dd.base_service", "original-service")],
&[("_dd.measured", 1.0)],
),
// Another internal span with same _dd.base_service (should aggregate together)
get_test_span_with_meta(
now,
3,
0,
50,
5,
"A1",
"internal.with.base.service",
0,
&[("_dd.base_service", "original-service")],
&[("_dd.measured", 1.0)],
),
// Internal span with different _dd.base_service (should be separate group)
get_test_span_with_meta(
now,
4,
0,
60,
5,
"A1",
"internal.with.base.service",
0,
&[("_dd.base_service", "other-service")],
&[("_dd.measured", 1.0)],
),
// Client span with _dd.base_service and other peer tags enabled
// (should use configured peer tags, not base_service)
get_test_span_with_meta(
now,
5,
0,
80,
5,
"A1",
"SELECT * FROM users",
0,
&[
("span.kind", "client"),
("_dd.base_service", "ignored-for-client"),
("db.instance", "i-1234"),
("db.system", "postgres"),
],
&[("_dd.measured", 1.0)],
),
];
compute_top_level_span(spans.as_mut_slice());

let mut concentrator = SpanConcentrator::new(
Duration::from_nanos(BUCKET_SIZE),
now,
get_span_kinds(),
vec!["db.instance".to_string(), "db.system".to_string()],
);

for span in &spans {
concentrator.add_span(span);
}

let flushtime =
now + Duration::from_nanos(concentrator.bucket_size * concentrator.buffer_len as u64);

let expected = vec![
// Internal span without base_service - no peer tags
pb::ClientGroupedStats {
service: "A1".to_string(),
resource: "internal.operation".to_string(),
r#type: "db".to_string(),
name: "query".to_string(),
duration: 100,
hits: 1,
top_level_hits: 1,
errors: 0,
is_trace_root: pb::Trilean::True.into(),
..Default::default()
},
// Internal spans with _dd.base_service="original-service" - aggregated with base_service
// peer tag
pb::ClientGroupedStats {
service: "A1".to_string(),
resource: "internal.with.base.service".to_string(),
r#type: "db".to_string(),
name: "query".to_string(),
peer_tags: vec!["_dd.base_service:original-service".to_string()],
duration: 125,
hits: 2,
top_level_hits: 2,
errors: 0,
is_trace_root: pb::Trilean::True.into(),
..Default::default()
},
// Internal span with _dd.base_service="other-service" - separate group
pb::ClientGroupedStats {
service: "A1".to_string(),
resource: "internal.with.base.service".to_string(),
r#type: "db".to_string(),
name: "query".to_string(),
peer_tags: vec!["_dd.base_service:other-service".to_string()],
duration: 60,
hits: 1,
top_level_hits: 1,
errors: 0,
is_trace_root: pb::Trilean::True.into(),
..Default::default()
},
// Client span - uses configured peer tags, not base_service
pb::ClientGroupedStats {
service: "A1".to_string(),
resource: "SELECT * FROM users".to_string(),
r#type: "db".to_string(),
name: "query".to_string(),
span_kind: "client".to_string(),
peer_tags: vec![
"db.instance:i-1234".to_string(),
"db.system:postgres".to_string(),
],
duration: 80,
hits: 1,
top_level_hits: 1,
errors: 0,
is_trace_root: pb::Trilean::True.into(),
..Default::default()
},
];

let stats = concentrator.flush(flushtime, false);
assert_counts_equal(
expected,
stats
.first()
.expect("There should be at least one time bucket")
.stats
.clone(),
);
}

#[test]
fn test_compute_stats_for_span_kind() {
let test_cases: Vec<(SpanSlice, bool)> = vec![
Expand Down
Loading