From 1ca2eabe26da9ced9533d92db34827544ee888ef Mon Sep 17 00:00:00 2001 From: Anais Raison Date: Mon, 20 Apr 2026 15:49:38 +0200 Subject: [PATCH 01/13] feat: add encoder from v04 to v1 --- libdd-data-pipeline/src/trace_exporter/mod.rs | 2 + .../src/trace_exporter/trace_serializer.rs | 32 +- libdd-trace-utils/src/msgpack_encoder/mod.rs | 22 ++ .../src/msgpack_encoder/v1/mod.rs | 340 ++++++++++++++++++ .../src/msgpack_encoder/v1/span_v04.rs | 310 ++++++++++++++++ libdd-trace-utils/src/tracer_payload.rs | 5 + 6 files changed, 704 insertions(+), 7 deletions(-) create mode 100644 libdd-trace-utils/src/msgpack_encoder/v1/mod.rs create mode 100644 libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 180193f507..fca7c3e340 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -74,6 +74,7 @@ pub enum TraceExporterOutputFormat { #[default] V04, V05, + V1, } impl TraceExporterOutputFormat { @@ -84,6 +85,7 @@ impl TraceExporterOutputFormat { match self { TraceExporterOutputFormat::V04 => "/v0.4/traces", TraceExporterOutputFormat::V05 => "/v0.5/traces", + TraceExporterOutputFormat::V1 => "/v1.0/traces", }, ) } diff --git a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs index 2a257c7e82..94abb09902 100644 --- a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs +++ b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs @@ -67,13 +67,30 @@ impl<'a> TraceSerializer<'a> { &self, traces: Vec>>, ) -> Result, TraceExporterError> { - let use_v05_format = match self.output_format { - TraceExporterOutputFormat::V05 => true, - TraceExporterOutputFormat::V04 => false, - }; - trace_utils::collect_trace_chunks(traces, use_v05_format).map_err(|e| { - TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string())) - }) + match self.output_format { + TraceExporterOutputFormat::V1 => { + // For V1, collect as V04 spans and wrap in the V1 variant so the + // serializer knows to use the V1 msgpack encoder. + let chunks = + trace_utils::collect_trace_chunks(traces, false).map_err(|e| { + TraceExporterError::Deserialization(DecodeError::InvalidFormat( + e.to_string(), + )) + })?; + match chunks { + tracer_payload::TraceChunks::V04(traces) => { + Ok(tracer_payload::TraceChunks::V1(traces)) + } + other => Ok(other), + } + } + format => { + let use_v05_format = matches!(format, TraceExporterOutputFormat::V05); + trace_utils::collect_trace_chunks(traces, use_v05_format).map_err(|e| { + TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string())) + }) + } + } } /// Build HTTP headers for traces request @@ -101,6 +118,7 @@ impl<'a> TraceSerializer<'a> { tracer_payload::TraceChunks::V05(p) => { rmp_serde::to_vec(p).map_err(TraceExporterError::Serialization) } + tracer_payload::TraceChunks::V1(p) => Ok(msgpack_encoder::v1::to_vec(p)), } } } diff --git a/libdd-trace-utils/src/msgpack_encoder/mod.rs b/libdd-trace-utils/src/msgpack_encoder/mod.rs index 876d09627d..d8c9a11d3e 100644 --- a/libdd-trace-utils/src/msgpack_encoder/mod.rs +++ b/libdd-trace-utils/src/msgpack_encoder/mod.rs @@ -2,3 +2,25 @@ // SPDX-License-Identifier: Apache-2.0 pub mod v04; +pub mod v1; + +pub(crate) struct CountLength(u32); + +impl std::io::Write for CountLength { + #[inline] + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.write_all(buf)?; + Ok(buf.len()) + } + + #[inline] + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } + + #[inline] + fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> { + self.0 += buf.len() as u32; + Ok(()) + } +} diff --git a/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs b/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs new file mode 100644 index 0000000000..f63f9d1032 --- /dev/null +++ b/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs @@ -0,0 +1,340 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +mod span_v04; + +use crate::span::v04::Span; +use crate::span::TraceData; +use rmp::encode::{ + write_array_len, write_bin, write_map_len, write_sint, write_str, write_uint, write_uint8, + ByteBuf, RmpWrite, ValueWriteError, +}; +use std::borrow::Borrow; +use std::collections::HashMap; + +/// Integer keys for the top-level V1 trace payload map. +#[repr(u8)] +enum TraceKey { + Chunks = 11, +} + +/// Integer keys for V1 chunk-level fields. +#[repr(u8)] +enum ChunkKey { + Priority = 1, + Origin = 2, + Spans = 4, + TraceId = 6, +} + +/// Streaming string intern table. +/// +/// The first time a string is written, it is emitted as a msgpack `str` and assigned an +/// incrementing integer ID. On subsequent occurrences only the ID is emitted as a msgpack `uint`. +/// ID 0 is reserved for the empty string (written as fixint `0`). +/// +/// The string table is scoped per payload: each `to_vec` / `write_to_slice` call starts with a +/// fresh table so deduplication is payload-local. +pub(crate) struct StringTable { + seen: HashMap, + next_id: u32, +} + +impl StringTable { + fn new() -> Self { + Self { + seen: HashMap::new(), + next_id: 1, + } + } + + /// Writes `s` to `writer` using string interning. + /// + /// - Empty string → fixint `0` + /// - First occurrence of `s` → msgpack `str`, ID recorded for future references + /// - Subsequent occurrence → msgpack `uint` carrying the previously assigned ID + pub(crate) fn write_interned>( + &mut self, + writer: &mut W, + s: S, + ) -> Result<(), ValueWriteError> { + let s = s.as_ref(); + if s.is_empty() { + write_uint8(writer, 0)?; + return Ok(()); + } + if let Some(&id) = self.seen.get(s) { + write_uint(writer, id as u64)?; + } else { + let id = self.next_id; + self.next_id += 1; + self.seen.insert(s.to_string(), id); + write_str(writer, s)?; + } + Ok(()) + } +} + + +/// Promoted fields extracted from spans and written at the chunk level. +struct ChunkAttrs { + /// Full 128-bit trace ID (encodes as 16-byte big-endian binary). + trace_id: u128, + /// Sampling priority from `_sampling_priority_v1` metric on the root span. + sampling_priority: Option, + /// Origin tag from `_dd.origin` meta on the root span. + origin: Option, +} + +fn extract_chunk_attrs(spans: &[Span]) -> ChunkAttrs { + let mut trace_id = 0u128; + let mut sampling_priority = None; + let mut origin: Option = None; + + for span in spans { + // Any span gives us the trace_id. + trace_id = span.trace_id; + + // Chunk-level attributes come from the root span (parent_id == 0). + if span.parent_id == 0 { + // HashMap::get accepts &Q where K: Borrow; T::Text: Borrow so &str works. + if let Some(v) = span.metrics.get("_sampling_priority_v1") { + sampling_priority = Some(*v as i32); + } + if let Some(v) = span.meta.get("_dd.origin") { + origin = Some(v.borrow().to_owned()); + } + } + } + + ChunkAttrs { + trace_id, + sampling_priority, + origin, + } +} + +/// Encodes all traces as a V1 msgpack payload. +/// +/// Top-level format: +/// ```text +/// Map { +/// TraceKey::Chunks (11) → Array[Chunk, ...] +/// } +/// ``` +fn encode_payload]>>( + writer: &mut W, + traces: &[S], +) -> Result<(), ValueWriteError> { + let mut table = StringTable::new(); + + // Top-level map contains only the chunks array for now. + write_map_len(writer, 1)?; + write_uint8(writer, TraceKey::Chunks as u8)?; + + write_array_len(writer, traces.len() as u32)?; + for trace in traces { + encode_chunk(writer, trace.as_ref(), &mut table)?; + } + + Ok(()) +} + +/// Encodes one chunk (a group of spans sharing a trace ID). +/// +/// ```text +/// Map { +/// ChunkKey::TraceId (6) → bin[16] // 128-bit big-endian +/// ChunkKey::Origin (2) → str|uint // optional, interned +/// ChunkKey::Priority (1) → int // optional +/// ChunkKey::Spans (4) → Array[Span, ...] +/// } +/// ``` +fn encode_chunk( + writer: &mut W, + spans: &[Span], + table: &mut StringTable, +) -> Result<(), ValueWriteError> { + let attrs = extract_chunk_attrs(spans); + + let fields = 2u32 // trace_id + spans are always present + + attrs.origin.is_some() as u32 + + attrs.sampling_priority.is_some() as u32; + + write_map_len(writer, fields)?; + + // 128-bit trace ID as 16-byte big-endian binary. + write_uint8(writer, ChunkKey::TraceId as u8)?; + write_bin(writer, &attrs.trace_id.to_be_bytes())?; + + if let Some(ref origin) = attrs.origin { + write_uint8(writer, ChunkKey::Origin as u8)?; + table.write_interned(writer, origin.as_str())?; + } + + if let Some(priority) = attrs.sampling_priority { + write_uint8(writer, ChunkKey::Priority as u8)?; + write_sint(writer, priority as i64)?; + } + + write_uint8(writer, ChunkKey::Spans as u8)?; + write_array_len(writer, spans.len() as u32)?; + for span in spans { + span_v04::encode_span(writer, span, table)?; + } + + Ok(()) +} + +/// Serializes traces into a slice using the V1 msgpack format. +/// +/// # Errors +/// Returns a `ValueWriteError` if the underlying writer fails. +pub fn write_to_slice]>>( + slice: &mut &mut [u8], + traces: &[S], +) -> Result<(), ValueWriteError> { + encode_payload(slice, traces) +} + +/// Serializes traces into a `Vec` using the V1 msgpack format. +pub fn to_vec]>>(traces: &[S]) -> Vec { + to_vec_with_capacity(traces, 0) +} + +/// Serializes traces into a `Vec` with a pre-allocated capacity. +pub fn to_vec_with_capacity]>>( + traces: &[S], + capacity: u32, +) -> Vec { + let mut buf = ByteBuf::with_capacity(capacity as usize); + #[allow(clippy::expect_used)] + encode_payload(&mut buf, traces) + .expect("infallible: the error is std::convert::Infallible"); + buf.into_vec() +} + +/// Returns the number of bytes the V1 payload for `traces` would occupy. +pub fn to_len]>>(traces: &[S]) -> u32 { + let mut counter = super::CountLength(0); + #[allow(clippy::expect_used)] + encode_payload(&mut counter, traces) + .expect("infallible: CountLength never fails"); + counter.0 +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::span::v04::SpanBytes; + use libdd_tinybytes::BytesString; + use std::collections::HashMap; + + fn make_span( + service: &str, + name: &str, + trace_id: u128, + span_id: u64, + parent_id: u64, + ) -> SpanBytes { + SpanBytes { + service: BytesString::from_slice(service.as_bytes()).unwrap(), + name: BytesString::from_slice(name.as_bytes()).unwrap(), + resource: BytesString::from_slice(b"res").unwrap(), + trace_id, + span_id, + parent_id, + start: 1_000_000, + duration: 500, + ..Default::default() + } + } + + #[test] + fn test_to_vec_non_empty() { + let spans = vec![make_span("svc", "op", 42, 1, 0)]; + let traces = vec![spans]; + let encoded = to_vec(&traces); + assert!(!encoded.is_empty()); + } + + #[test] + fn test_to_vec_empty_traces() { + let traces: Vec> = vec![]; + let encoded = to_vec(&traces); + // Must still produce a valid msgpack map with an empty chunks array. + assert!(!encoded.is_empty()); + } + + #[test] + fn test_string_interning_reduces_size() { + // Two spans with the same service name — second occurrence should use the integer ID. + let s1 = make_span("my-service", "op1", 1, 1, 0); + let s2 = make_span("my-service", "op2", 2, 2, 0); + let traces_two = vec![vec![s1], vec![s2]]; + + // Single span for baseline. + let s_single = make_span("my-service", "op1", 1, 1, 0); + let traces_single = vec![vec![s_single]]; + + let encoded_two = to_vec(&traces_two); + let encoded_single = to_vec(&traces_single); + + // The two-trace payload should be less than 2× the single-trace payload + // if interning is working (the second "my-service" is encoded as an integer). + assert!( + encoded_two.len() < 2 * encoded_single.len(), + "Interning should reduce size: two={} single={}", + encoded_two.len(), + encoded_single.len() + ); + } + + #[test] + fn test_chunk_level_attrs_origin_and_priority() { + let mut meta = HashMap::new(); + meta.insert( + BytesString::from_static("_dd.origin"), + BytesString::from_static("lambda"), + ); + let mut metrics = HashMap::new(); + metrics.insert(BytesString::from_static("_sampling_priority_v1"), 1.0f64); + + let root = SpanBytes { + service: BytesString::from_slice(b"svc").unwrap(), + name: BytesString::from_slice(b"op").unwrap(), + resource: BytesString::from_slice(b"res").unwrap(), + trace_id: 99, + span_id: 1, + parent_id: 0, + start: 1000, + duration: 100, + meta, + metrics, + ..Default::default() + }; + + let encoded = to_vec(&[vec![root]]); + assert!(!encoded.is_empty()); + // The payload must contain "lambda" somewhere (the origin string). + let lambda_bytes = b"lambda"; + assert!( + encoded + .windows(lambda_bytes.len()) + .any(|w| w == lambda_bytes), + "origin 'lambda' should appear in payload" + ); + } + + #[test] + fn test_to_len_matches_to_vec() { + let spans = vec![ + make_span("svc", "op", 1, 1, 0), + make_span("svc", "child", 1, 2, 1), + ]; + let traces = vec![spans]; + let encoded = to_vec(&traces); + let len = to_len(&traces); + assert_eq!(encoded.len() as u32, len); + } +} diff --git a/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs b/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs new file mode 100644 index 0000000000..c5e5b231b4 --- /dev/null +++ b/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs @@ -0,0 +1,310 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::span::v04::{AttributeAnyValue, AttributeArrayValue, Span, SpanEvent, SpanLink}; +use crate::span::TraceData; +use rmp::encode::{ + write_bin, write_bool, write_f64, write_i64, write_sint, write_u64, write_uint, write_uint8, + RmpWrite, ValueWriteError, +}; +use std::borrow::Borrow; + +use super::StringTable; + +/// Integer keys for V1 span fields. +#[repr(u8)] +pub(super) enum SpanKey { + Service = 1, + Name = 2, + Resource = 3, + SpanId = 4, + ParentId = 5, + Start = 6, + Duration = 7, + Error = 8, + Attributes = 9, + Type = 10, + SpanLinks = 11, + SpanEvents = 12, +} + +/// Integer keys for V1 span link fields. +#[repr(u8)] +pub(super) enum SpanLinkKey { + TraceId = 1, + SpanId = 2, + Attributes = 3, + TraceState = 4, + Flags = 5, +} + +/// Integer keys for V1 span event fields. +#[repr(u8)] +pub(super) enum SpanEventKey { + Time = 1, + Name = 2, + Attributes = 3, +} + +/// Type discriminants for attribute values. +/// An attribute value is encoded as [type_uint8][actual_value]. +#[repr(u8)] +pub(super) enum AnyValueKey { + String = 1, + Bool = 2, + Double = 3, + Bytes = 5, +} + +/// Encodes span links into the V1 format. +/// +/// Uses integer keys and string interning for string values. Each span link's +/// trace ID is encoded as a 16-byte big-endian binary. +pub fn encode_span_links( + writer: &mut W, + span_links: &[SpanLink], + table: &mut StringTable, +) -> Result<(), ValueWriteError> { + write_uint8(writer, SpanKey::SpanLinks as u8)?; + rmp::encode::write_array_len(writer, span_links.len() as u32)?; + + for link in span_links.iter() { + let trace_id_128 = ((link.trace_id_high as u128) << 64) | link.trace_id as u128; + let link_len = 1 /* trace_id (always) */ + + (link.span_id != 0) as u32 + + (!link.attributes.is_empty()) as u32 + + (!link.tracestate.borrow().is_empty()) as u32 + + (link.flags != 0) as u32; + + rmp::encode::write_map_len(writer, link_len)?; + + write_uint8(writer, SpanLinkKey::TraceId as u8)?; + write_bin(writer, &trace_id_128.to_be_bytes())?; + + if link.span_id != 0 { + write_uint8(writer, SpanLinkKey::SpanId as u8)?; + write_u64(writer, link.span_id)?; + } + + if !link.tracestate.borrow().is_empty() { + write_uint8(writer, SpanLinkKey::TraceState as u8)?; + table.write_interned(writer, link.tracestate.borrow())?; + } + + if link.flags != 0 { + write_uint8(writer, SpanLinkKey::Flags as u8)?; + write_uint(writer, link.flags as u64)?; + } + + if !link.attributes.is_empty() { + write_uint8(writer, SpanLinkKey::Attributes as u8)?; + rmp::encode::write_map_len(writer, link.attributes.len() as u32)?; + for (k, v) in link.attributes.iter() { + table.write_interned(writer, k.borrow())?; + write_uint8(writer, AnyValueKey::String as u8)?; + table.write_interned(writer, v.borrow())?; + } + } + } + + Ok(()) +} + +/// Encodes span events into the V1 format. +/// +/// Uses integer keys and string interning. Attribute values are type-tagged. +pub fn encode_span_events( + writer: &mut W, + span_events: &[SpanEvent], + table: &mut StringTable, +) -> Result<(), ValueWriteError> { + write_uint8(writer, SpanKey::SpanEvents as u8)?; + rmp::encode::write_array_len(writer, span_events.len() as u32)?; + + for event in span_events.iter() { + let event_len = 2 /* time_unix_nano, name */ + + (!event.attributes.is_empty()) as u32; + + rmp::encode::write_map_len(writer, event_len)?; + + write_uint8(writer, SpanEventKey::Time as u8)?; + write_u64(writer, event.time_unix_nano)?; + + write_uint8(writer, SpanEventKey::Name as u8)?; + table.write_interned(writer, event.name.borrow())?; + + if !event.attributes.is_empty() { + write_uint8(writer, SpanEventKey::Attributes as u8)?; + encode_span_event_attributes(writer, event, table)?; + } + } + + Ok(()) +} + +fn encode_span_event_attributes( + writer: &mut W, + event: &SpanEvent, + table: &mut StringTable, +) -> Result<(), ValueWriteError> { + rmp::encode::write_map_len(writer, event.attributes.len() as u32)?; + for (k, attribute) in event.attributes.iter() { + table.write_interned(writer, k.borrow())?; + encode_attribute_any_value(writer, attribute, table)?; + } + Ok(()) +} + +fn encode_attribute_any_value( + writer: &mut W, + attribute: &AttributeAnyValue, + table: &mut StringTable, +) -> Result<(), ValueWriteError> { + fn encode_array_element( + writer: &mut W, + value: &AttributeArrayValue, + table: &mut StringTable, + ) -> Result<(), ValueWriteError> { + match value { + AttributeArrayValue::String(s) => { + write_uint8(writer, AnyValueKey::String as u8)?; + table.write_interned(writer, s.borrow())?; + } + AttributeArrayValue::Boolean(b) => { + write_uint8(writer, AnyValueKey::Bool as u8)?; + write_bool(writer, *b).map_err(ValueWriteError::InvalidDataWrite)?; + } + AttributeArrayValue::Integer(i) => { + write_uint8(writer, 4u8)?; // Int64 + write_sint(writer, *i)?; + } + AttributeArrayValue::Double(d) => { + write_uint8(writer, AnyValueKey::Double as u8)?; + write_f64(writer, *d)?; + } + } + Ok(()) + } + + match attribute { + AttributeAnyValue::SingleValue(value) => { + encode_array_element(writer, value, table)?; + } + AttributeAnyValue::Array(array) => { + write_uint8(writer, 6u8)?; // Array + rmp::encode::write_array_len(writer, array.len() as u32)?; + for v in array.iter() { + encode_array_element(writer, v, table)?; + } + } + } + Ok(()) +} + +/// Encodes a v0.4 span into the V1 msgpack format. +/// +/// Key differences from v0.4: +/// - Uses integer keys for all fields. +/// - `meta` and `metrics` are combined into a single `attributes` map with type-tagged values. +/// - `meta_struct` bytes are included in `attributes` as `Bytes` values. +/// - `trace_id` is not encoded in the span (it belongs to the chunk). +/// - `error` is encoded as a boolean. +/// - String values use streaming string interning via `StringTable`. +#[inline(always)] +pub fn encode_span( + writer: &mut W, + span: &Span, + table: &mut StringTable, +) -> Result<(), ValueWriteError> { + let has_attributes = + !span.meta.is_empty() || !span.metrics.is_empty() || !span.meta_struct.is_empty(); + let span_len = 2 /* span_id, start — always present */ + + (!span.service.borrow().is_empty()) as u32 + + (!span.name.borrow().is_empty()) as u32 + + (!span.resource.borrow().is_empty()) as u32 + + (!span.r#type.borrow().is_empty()) as u32 + + (span.parent_id != 0) as u32 + + (span.duration != 0) as u32 + + (span.error != 0) as u32 + + has_attributes as u32 + + (!span.span_links.is_empty()) as u32 + + (!span.span_events.is_empty()) as u32; + + rmp::encode::write_map_len(writer, span_len)?; + + if !span.service.borrow().is_empty() { + write_uint8(writer, SpanKey::Service as u8)?; + table.write_interned(writer, span.service.borrow())?; + } + + if !span.name.borrow().is_empty() { + write_uint8(writer, SpanKey::Name as u8)?; + table.write_interned(writer, span.name.borrow())?; + } + + if !span.resource.borrow().is_empty() { + write_uint8(writer, SpanKey::Resource as u8)?; + table.write_interned(writer, span.resource.borrow())?; + } + + write_uint8(writer, SpanKey::SpanId as u8)?; + write_u64(writer, span.span_id)?; + + write_uint8(writer, SpanKey::Start as u8)?; + write_i64(writer, span.start)?; + + if span.parent_id != 0 { + write_uint8(writer, SpanKey::ParentId as u8)?; + write_u64(writer, span.parent_id)?; + } + + if span.duration != 0 { + write_uint8(writer, SpanKey::Duration as u8)?; + write_i64(writer, span.duration)?; + } + + if span.error != 0 { + write_uint8(writer, SpanKey::Error as u8)?; + write_bool(writer, span.error != 0).map_err(ValueWriteError::InvalidDataWrite)?; + } + + if !span.r#type.borrow().is_empty() { + write_uint8(writer, SpanKey::Type as u8)?; + table.write_interned(writer, span.r#type.borrow())?; + } + + if has_attributes { + let attr_count = + span.meta.len() as u32 + span.metrics.len() as u32 + span.meta_struct.len() as u32; + write_uint8(writer, SpanKey::Attributes as u8)?; + rmp::encode::write_map_len(writer, attr_count)?; + + for (k, v) in span.meta.iter() { + table.write_interned(writer, k.borrow())?; + write_uint8(writer, AnyValueKey::String as u8)?; + table.write_interned(writer, v.borrow())?; + } + + for (k, v) in span.metrics.iter() { + table.write_interned(writer, k.borrow())?; + write_uint8(writer, AnyValueKey::Double as u8)?; + write_f64(writer, *v)?; + } + + for (k, v) in span.meta_struct.iter() { + table.write_interned(writer, k.borrow())?; + write_uint8(writer, AnyValueKey::Bytes as u8)?; + write_bin(writer, v.borrow())?; + } + } + + if !span.span_links.is_empty() { + encode_span_links(writer, &span.span_links, table)?; + } + + if !span.span_events.is_empty() { + encode_span_events(writer, &span.span_events, table)?; + } + + Ok(()) +} diff --git a/libdd-trace-utils/src/tracer_payload.rs b/libdd-trace-utils/src/tracer_payload.rs index 5649ed7c74..79e603ba75 100644 --- a/libdd-trace-utils/src/tracer_payload.rs +++ b/libdd-trace-utils/src/tracer_payload.rs @@ -27,6 +27,8 @@ pub enum TraceChunks { V04(Vec>>), /// Collection of TraceChunkSpan with de-duplicated strings. V05((SharedDict, Vec>)), + /// Collection of v0.4 spans to be serialized as a V1 msgpack payload. + V1(Vec>>), } impl TraceChunks { @@ -34,6 +36,8 @@ impl TraceChunks { match self { TraceChunks::V04(traces) => TracerPayloadCollection::V04(traces), TraceChunks::V05(traces) => TracerPayloadCollection::V05(traces), + // V1 uses the same underlying span structure as V04. + TraceChunks::V1(traces) => TracerPayloadCollection::V04(traces), } } } @@ -44,6 +48,7 @@ impl TraceChunks { match self { TraceChunks::V04(traces) => traces.len(), TraceChunks::V05((_, traces)) => traces.len(), + TraceChunks::V1(traces) => traces.len(), } } } From 01e38e3b83c7c84c7d96f209c58111787567af48 Mon Sep 17 00:00:00 2001 From: Anais Raison Date: Mon, 20 Apr 2026 16:15:11 +0200 Subject: [PATCH 02/13] fix: fmt --- .../src/trace_exporter/trace_serializer.rs | 9 +++------ libdd-trace-utils/src/msgpack_encoder/v1/mod.rs | 7 ++----- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs index 94abb09902..25372b2c9c 100644 --- a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs +++ b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs @@ -71,12 +71,9 @@ impl<'a> TraceSerializer<'a> { TraceExporterOutputFormat::V1 => { // For V1, collect as V04 spans and wrap in the V1 variant so the // serializer knows to use the V1 msgpack encoder. - let chunks = - trace_utils::collect_trace_chunks(traces, false).map_err(|e| { - TraceExporterError::Deserialization(DecodeError::InvalidFormat( - e.to_string(), - )) - })?; + let chunks = trace_utils::collect_trace_chunks(traces, false).map_err(|e| { + TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string())) + })?; match chunks { tracer_payload::TraceChunks::V04(traces) => { Ok(tracer_payload::TraceChunks::V1(traces)) diff --git a/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs b/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs index f63f9d1032..dc4ff89b61 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs @@ -75,7 +75,6 @@ impl StringTable { } } - /// Promoted fields extracted from spans and written at the chunk level. struct ChunkAttrs { /// Full 128-bit trace ID (encodes as 16-byte big-endian binary). @@ -209,8 +208,7 @@ pub fn to_vec_with_capacity]>>( ) -> Vec { let mut buf = ByteBuf::with_capacity(capacity as usize); #[allow(clippy::expect_used)] - encode_payload(&mut buf, traces) - .expect("infallible: the error is std::convert::Infallible"); + encode_payload(&mut buf, traces).expect("infallible: the error is std::convert::Infallible"); buf.into_vec() } @@ -218,8 +216,7 @@ pub fn to_vec_with_capacity]>>( pub fn to_len]>>(traces: &[S]) -> u32 { let mut counter = super::CountLength(0); #[allow(clippy::expect_used)] - encode_payload(&mut counter, traces) - .expect("infallible: CountLength never fails"); + encode_payload(&mut counter, traces).expect("infallible: CountLength never fails"); counter.0 } From 21cf191c24a003890dcb4479fe019e0c766d8055 Mon Sep 17 00:00:00 2001 From: Anais Raison Date: Thu, 30 Apr 2026 15:54:55 +0200 Subject: [PATCH 03/13] fix: address comments --- .../src/trace_exporter/builder.rs | 4 +- .../src/trace_exporter/trace_serializer.rs | 2 +- .../src/msgpack_encoder/v04/mod.rs | 29 +-- .../src/msgpack_encoder/v1/mod.rs | 241 ++++++++++++++---- .../src/msgpack_encoder/v1/span_v04.rs | 13 +- libdd-trace-utils/src/send_data/mod.rs | 2 +- 6 files changed, 202 insertions(+), 89 deletions(-) diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 29c3d78657..62775d45a4 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -510,7 +510,9 @@ impl TraceExporterBuilder { match input { TraceExporterInputFormat::V04 => matches!( output, - TraceExporterOutputFormat::V04 | TraceExporterOutputFormat::V05 + TraceExporterOutputFormat::V04 + | TraceExporterOutputFormat::V05 + | TraceExporterOutputFormat::V1 ), TraceExporterInputFormat::V05 => matches!(output, TraceExporterOutputFormat::V05), } diff --git a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs index 25372b2c9c..2d1c3fcc2d 100644 --- a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs +++ b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs @@ -78,7 +78,7 @@ impl<'a> TraceSerializer<'a> { tracer_payload::TraceChunks::V04(traces) => { Ok(tracer_payload::TraceChunks::V1(traces)) } - other => Ok(other), + _ => unreachable!(), } } format => { diff --git a/libdd-trace-utils/src/msgpack_encoder/v04/mod.rs b/libdd-trace-utils/src/msgpack_encoder/v04/mod.rs index 97cf222268..1c4e0ec3af 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v04/mod.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v04/mod.rs @@ -128,27 +128,6 @@ pub fn to_vec_with_capacity]>>( buf.into_vec() } -struct CountLength(u32); - -impl std::io::Write for CountLength { - #[inline] - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.write_all(buf)?; - Ok(buf.len()) - } - - #[inline] - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } - - #[inline] - fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> { - self.0 += buf.len() as u32; - Ok(()) - } -} - /// Computes the number of bytes required to encode the given traces. /// /// This does not allocate any actual buffer, but simulates writing in order to measure @@ -165,7 +144,7 @@ impl std::io::Write for CountLength { /// # Examples /// /// ``` -/// use libdd_trace_utils::msgpack_encoder::v04::to_len; +/// use libdd_trace_utils::msgpack_encoder::v04::to_encoded_byte_len; /// use libdd_trace_utils::span::v04::SpanSlice; /// /// let span = SpanSlice { @@ -173,12 +152,12 @@ impl std::io::Write for CountLength { /// ..Default::default() /// }; /// let traces = vec![vec![span]]; -/// let encoded_len = to_len(&traces); +/// let encoded_len = to_encoded_byte_len(&traces); /// /// assert!(encoded_len > 0); /// ``` -pub fn to_len]>>(traces: &[S]) -> u32 { - let mut counter = CountLength(0); +pub fn to_encoded_byte_len]>>(traces: &[S]) -> u32 { + let mut counter = super::CountLength(0); #[allow(clippy::expect_used)] to_writer(&mut counter, traces).expect("infallible: CountLength never fails"); counter.0 diff --git a/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs b/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs index dc4ff89b61..bb503e8652 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs @@ -13,44 +13,44 @@ use std::borrow::Borrow; use std::collections::HashMap; /// Integer keys for the top-level V1 trace payload map. -#[repr(u8)] -enum TraceKey { - Chunks = 11, +mod trace_key { + pub const ENV_REF: u8 = 7; + pub const HOSTNAME_REF: u8 = 8; + pub const APP_VERSION_REF: u8 = 9; + pub const CHUNKS: u8 = 11; } /// Integer keys for V1 chunk-level fields. -#[repr(u8)] -enum ChunkKey { - Priority = 1, - Origin = 2, - Spans = 4, - TraceId = 6, +mod chunk_key { + pub const PRIORITY: u8 = 1; + pub const ORIGIN: u8 = 2; + pub const SPANS: u8 = 4; + pub const TRACE_ID: u8 = 6; + /// Sampling mechanism (previously the `_dd.p.dm` span tag). + pub const SAMPLING_MECHANISM: u8 = 7; } /// Streaming string intern table. /// /// The first time a string is written, it is emitted as a msgpack `str` and assigned an /// incrementing integer ID. On subsequent occurrences only the ID is emitted as a msgpack `uint`. -/// ID 0 is reserved for the empty string (written as fixint `0`). +/// ID 0 is reserved for the empty string (pre-inserted in the constructor). /// /// The string table is scoped per payload: each `to_vec` / `write_to_slice` call starts with a /// fresh table so deduplication is payload-local. pub(crate) struct StringTable { seen: HashMap, - next_id: u32, } impl StringTable { fn new() -> Self { - Self { - seen: HashMap::new(), - next_id: 1, - } + let mut seen = HashMap::new(); + seen.insert(String::new(), 0); + Self { seen } } /// Writes `s` to `writer` using string interning. /// - /// - Empty string → fixint `0` /// - First occurrence of `s` → msgpack `str`, ID recorded for future references /// - Subsequent occurrence → msgpack `uint` carrying the previously assigned ID pub(crate) fn write_interned>( @@ -59,15 +59,10 @@ impl StringTable { s: S, ) -> Result<(), ValueWriteError> { let s = s.as_ref(); - if s.is_empty() { - write_uint8(writer, 0)?; - return Ok(()); - } if let Some(&id) = self.seen.get(s) { write_uint(writer, id as u64)?; } else { - let id = self.next_id; - self.next_id += 1; + let id = self.seen.len() as u32; self.seen.insert(s.to_string(), id); write_str(writer, s)?; } @@ -75,34 +70,83 @@ impl StringTable { } } +/// Promoted fields extracted from the payload's spans, written at the top-level map. +struct PayloadAttrs<'a> { + env: Option<&'a str>, + hostname: Option<&'a str>, + app_version: Option<&'a str>, +} + +fn extract_payload_attrs<'a, T: TraceData + 'a, S: AsRef<[Span]>>(traces: &'a [S]) -> PayloadAttrs<'a> +where + T::Text: 'a, +{ + let mut env = None; + let mut hostname = None; + let mut app_version = None; + + 'outer: for trace in traces { + for span in trace.as_ref() { + if env.is_none() { + env = span.meta.get("env").map(|v| v.borrow()); + } + if hostname.is_none() { + hostname = span.meta.get("_dd.hostname").map(|v| v.borrow()); + } + if app_version.is_none() { + app_version = span.meta.get("version").map(|v| v.borrow()); + } + if env.is_some() && hostname.is_some() && app_version.is_some() { + break 'outer; + } + } + } + + PayloadAttrs { env, hostname, app_version } +} + /// Promoted fields extracted from spans and written at the chunk level. -struct ChunkAttrs { +struct ChunkAttrs<'a> { /// Full 128-bit trace ID (encodes as 16-byte big-endian binary). trace_id: u128, /// Sampling priority from `_sampling_priority_v1` metric on the root span. sampling_priority: Option, /// Origin tag from `_dd.origin` meta on the root span. - origin: Option, + origin: Option<&'a str>, + /// Sampling mechanism from `_dd.p.dm` meta on the root span. + sampling_mechanism: Option, } -fn extract_chunk_attrs(spans: &[Span]) -> ChunkAttrs { +fn extract_chunk_attrs<'a, T: TraceData>(spans: &'a [Span]) -> ChunkAttrs<'a> +where + T::Text: 'a, +{ let mut trace_id = 0u128; let mut sampling_priority = None; - let mut origin: Option = None; + let mut origin = None; + let mut sampling_mechanism = None; for span in spans { - // Any span gives us the trace_id. trace_id = span.trace_id; - // Chunk-level attributes come from the root span (parent_id == 0). - if span.parent_id == 0 { - // HashMap::get accepts &Q where K: Borrow; T::Text: Borrow so &str works. + // Root span: either no parent in this chunk, or tagged _dd.top_level=1 (remote parent). + let is_root = span.parent_id == 0 + || span.metrics.get("_dd.top_level").copied().unwrap_or(0.0) == 1.0; + + if is_root { if let Some(v) = span.metrics.get("_sampling_priority_v1") { sampling_priority = Some(*v as i32); } if let Some(v) = span.meta.get("_dd.origin") { - origin = Some(v.borrow().to_owned()); + origin = Some(v.borrow()); } + // _dd.p.dm is a signed integer sampling mechanism code stored as a string. + if let Some(v) = span.meta.get("_dd.p.dm") { + if let Ok(dm) = v.borrow().parse::() { + sampling_mechanism = Some(dm as u32); + } + } + break; } } @@ -110,6 +154,7 @@ fn extract_chunk_attrs(spans: &[Span]) -> ChunkAttrs { trace_id, sampling_priority, origin, + sampling_mechanism, } } @@ -118,7 +163,10 @@ fn extract_chunk_attrs(spans: &[Span]) -> ChunkAttrs { /// Top-level format: /// ```text /// Map { -/// TraceKey::Chunks (11) → Array[Chunk, ...] +/// trace_key::ENV_REF (7) → str|uint // optional, interned +/// trace_key::HOSTNAME_REF (8) → str|uint // optional, interned +/// trace_key::APP_VERSION (9) → str|uint // optional, interned +/// trace_key::CHUNKS (11) → Array[Chunk, ...] /// } /// ``` fn encode_payload]>>( @@ -126,11 +174,31 @@ fn encode_payload]>>( traces: &[S], ) -> Result<(), ValueWriteError> { let mut table = StringTable::new(); + let payload_attrs = extract_payload_attrs(traces); + + let map_len = 1u32 // chunks always present + + payload_attrs.env.is_some() as u32 + + payload_attrs.hostname.is_some() as u32 + + payload_attrs.app_version.is_some() as u32; - // Top-level map contains only the chunks array for now. - write_map_len(writer, 1)?; - write_uint8(writer, TraceKey::Chunks as u8)?; + write_map_len(writer, map_len)?; + if let Some(env) = payload_attrs.env { + write_uint8(writer, trace_key::ENV_REF)?; + table.write_interned(writer, env)?; + } + + if let Some(hostname) = payload_attrs.hostname { + write_uint8(writer, trace_key::HOSTNAME_REF)?; + table.write_interned(writer, hostname)?; + } + + if let Some(app_version) = payload_attrs.app_version { + write_uint8(writer, trace_key::APP_VERSION_REF)?; + table.write_interned(writer, app_version)?; + } + + write_uint8(writer, trace_key::CHUNKS)?; write_array_len(writer, traces.len() as u32)?; for trace in traces { encode_chunk(writer, trace.as_ref(), &mut table)?; @@ -143,10 +211,11 @@ fn encode_payload]>>( /// /// ```text /// Map { -/// ChunkKey::TraceId (6) → bin[16] // 128-bit big-endian -/// ChunkKey::Origin (2) → str|uint // optional, interned -/// ChunkKey::Priority (1) → int // optional -/// ChunkKey::Spans (4) → Array[Span, ...] +/// chunk_key::TRACE_ID (6) → bin[16] // 128-bit big-endian +/// chunk_key::ORIGIN (2) → str|uint // optional, interned +/// chunk_key::PRIORITY (1) → int // optional +/// chunk_key::SAMPLING_MECHANISM (7) → uint // optional +/// chunk_key::SPANS (4) → Array[Span, ...] /// } /// ``` fn encode_chunk( @@ -158,25 +227,30 @@ fn encode_chunk( let fields = 2u32 // trace_id + spans are always present + attrs.origin.is_some() as u32 - + attrs.sampling_priority.is_some() as u32; + + attrs.sampling_priority.is_some() as u32 + + attrs.sampling_mechanism.is_some() as u32; write_map_len(writer, fields)?; - // 128-bit trace ID as 16-byte big-endian binary. - write_uint8(writer, ChunkKey::TraceId as u8)?; + write_uint8(writer, chunk_key::TRACE_ID)?; write_bin(writer, &attrs.trace_id.to_be_bytes())?; - if let Some(ref origin) = attrs.origin { - write_uint8(writer, ChunkKey::Origin as u8)?; - table.write_interned(writer, origin.as_str())?; + if let Some(origin) = attrs.origin { + write_uint8(writer, chunk_key::ORIGIN)?; + table.write_interned(writer, origin)?; } if let Some(priority) = attrs.sampling_priority { - write_uint8(writer, ChunkKey::Priority as u8)?; + write_uint8(writer, chunk_key::PRIORITY)?; write_sint(writer, priority as i64)?; } - write_uint8(writer, ChunkKey::Spans as u8)?; + if let Some(mechanism) = attrs.sampling_mechanism { + write_uint8(writer, chunk_key::SAMPLING_MECHANISM)?; + write_uint(writer, mechanism as u64)?; + } + + write_uint8(writer, chunk_key::SPANS)?; write_array_len(writer, spans.len() as u32)?; for span in spans { span_v04::encode_span(writer, span, table)?; @@ -190,6 +264,7 @@ fn encode_chunk( /// # Errors /// Returns a `ValueWriteError` if the underlying writer fails. pub fn write_to_slice]>>( + // &mut &mut [u8] lets the caller see the slice shrink as bytes are written. slice: &mut &mut [u8], traces: &[S], ) -> Result<(), ValueWriteError> { @@ -207,16 +282,14 @@ pub fn to_vec_with_capacity]>>( capacity: u32, ) -> Vec { let mut buf = ByteBuf::with_capacity(capacity as usize); - #[allow(clippy::expect_used)] - encode_payload(&mut buf, traces).expect("infallible: the error is std::convert::Infallible"); + let _ = encode_payload(&mut buf, traces); buf.into_vec() } /// Returns the number of bytes the V1 payload for `traces` would occupy. -pub fn to_len]>>(traces: &[S]) -> u32 { +pub fn to_encoded_byte_len]>>(traces: &[S]) -> u32 { let mut counter = super::CountLength(0); - #[allow(clippy::expect_used)] - encode_payload(&mut counter, traces).expect("infallible: CountLength never fails"); + let _ = encode_payload(&mut counter, traces); counter.0 } @@ -324,14 +397,74 @@ mod tests { } #[test] - fn test_to_len_matches_to_vec() { + fn test_to_encoded_byte_len_matches_to_vec() { let spans = vec![ make_span("svc", "op", 1, 1, 0), make_span("svc", "child", 1, 2, 1), ]; let traces = vec![spans]; let encoded = to_vec(&traces); - let len = to_len(&traces); + let len = to_encoded_byte_len(&traces); assert_eq!(encoded.len() as u32, len); } + + #[test] + fn test_remote_parent_root_span_top_level() { + // A span with a non-zero parent_id but _dd.top_level=1.0 is a root in its chunk. + let mut metrics = HashMap::new(); + metrics.insert(BytesString::from_static("_dd.top_level"), 1.0f64); + metrics.insert(BytesString::from_static("_sampling_priority_v1"), 2.0f64); + + let root = SpanBytes { + service: BytesString::from_slice(b"svc").unwrap(), + name: BytesString::from_slice(b"op").unwrap(), + resource: BytesString::from_slice(b"res").unwrap(), + trace_id: 123, + span_id: 42, + parent_id: 999, // remote parent — not in this chunk + start: 1000, + duration: 100, + metrics, + ..Default::default() + }; + + let encoded = to_vec(&[vec![root]]); + assert!(!encoded.is_empty()); + } + + #[test] + fn test_payload_promoted_fields() { + let mut meta = HashMap::new(); + meta.insert(BytesString::from_static("env"), BytesString::from_static("prod")); + meta.insert(BytesString::from_static("version"), BytesString::from_static("1.2.3")); + meta.insert( + BytesString::from_static("_dd.hostname"), + BytesString::from_static("my-host"), + ); + + let span = SpanBytes { + service: BytesString::from_slice(b"svc").unwrap(), + name: BytesString::from_slice(b"op").unwrap(), + resource: BytesString::from_slice(b"res").unwrap(), + trace_id: 1, + span_id: 1, + parent_id: 0, + start: 1000, + duration: 100, + meta, + ..Default::default() + }; + + let encoded = to_vec(&[vec![span]]); + let prod_bytes = b"prod"; + assert!( + encoded.windows(prod_bytes.len()).any(|w| w == prod_bytes), + "env 'prod' should appear in payload" + ); + let host_bytes = b"my-host"; + assert!( + encoded.windows(host_bytes.len()).any(|w| w == host_bytes), + "hostname 'my-host' should appear in payload" + ); + } } diff --git a/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs b/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs index c5e5b231b4..dca475d758 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs @@ -68,9 +68,9 @@ pub fn encode_span_links( write_uint8(writer, SpanKey::SpanLinks as u8)?; rmp::encode::write_array_len(writer, span_links.len() as u32)?; - for link in span_links.iter() { + for link in span_links { let trace_id_128 = ((link.trace_id_high as u128) << 64) | link.trace_id as u128; - let link_len = 1 /* trace_id (always) */ + let link_len = 1 // trace_id (always) + (link.span_id != 0) as u32 + (!link.attributes.is_empty()) as u32 + (!link.tracestate.borrow().is_empty()) as u32 @@ -121,8 +121,8 @@ pub fn encode_span_events( write_uint8(writer, SpanKey::SpanEvents as u8)?; rmp::encode::write_array_len(writer, span_events.len() as u32)?; - for event in span_events.iter() { - let event_len = 2 /* time_unix_nano, name */ + for event in span_events { + let event_len = 2 // time_unix_nano, name + (!event.attributes.is_empty()) as u32; rmp::encode::write_map_len(writer, event_len)?; @@ -193,7 +193,7 @@ fn encode_attribute_any_value( AttributeAnyValue::Array(array) => { write_uint8(writer, 6u8)?; // Array rmp::encode::write_array_len(writer, array.len() as u32)?; - for v in array.iter() { + for v in array { encode_array_element(writer, v, table)?; } } @@ -210,7 +210,6 @@ fn encode_attribute_any_value( /// - `trace_id` is not encoded in the span (it belongs to the chunk). /// - `error` is encoded as a boolean. /// - String values use streaming string interning via `StringTable`. -#[inline(always)] pub fn encode_span( writer: &mut W, span: &Span, @@ -218,7 +217,7 @@ pub fn encode_span( ) -> Result<(), ValueWriteError> { let has_attributes = !span.meta.is_empty() || !span.metrics.is_empty() || !span.meta_struct.is_empty(); - let span_len = 2 /* span_id, start — always present */ + let span_len = 2 // span_id, start — always present + (!span.service.borrow().is_empty()) as u32 + (!span.name.borrow().is_empty()) as u32 + (!span.resource.borrow().is_empty()) as u32 diff --git a/libdd-trace-utils/src/send_data/mod.rs b/libdd-trace-utils/src/send_data/mod.rs index c884566d17..d358406ea0 100644 --- a/libdd-trace-utils/src/send_data/mod.rs +++ b/libdd-trace-utils/src/send_data/mod.rs @@ -531,7 +531,7 @@ mod tests { total } TracerPayloadCollection::V04(payloads) => { - msgpack_encoder::v04::to_len(payloads) as usize + msgpack_encoder::v04::to_encoded_byte_len(payloads) as usize } TracerPayloadCollection::V05(payloads) => rmp_serde::to_vec(payloads).unwrap().len(), } From aa0e3255cb93d2f063e7bec3056a15f24aa0b379 Mon Sep 17 00:00:00 2001 From: Anais Raison Date: Thu, 30 Apr 2026 16:39:34 +0200 Subject: [PATCH 04/13] fix: fmt --- .../src/msgpack_encoder/v1/mod.rs | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs b/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs index bb503e8652..a131f569d7 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs @@ -77,7 +77,9 @@ struct PayloadAttrs<'a> { app_version: Option<&'a str>, } -fn extract_payload_attrs<'a, T: TraceData + 'a, S: AsRef<[Span]>>(traces: &'a [S]) -> PayloadAttrs<'a> +fn extract_payload_attrs<'a, T: TraceData + 'a, S: AsRef<[Span]>>( + traces: &'a [S], +) -> PayloadAttrs<'a> where T::Text: 'a, { @@ -102,7 +104,11 @@ where } } - PayloadAttrs { env, hostname, app_version } + PayloadAttrs { + env, + hostname, + app_version, + } } /// Promoted fields extracted from spans and written at the chunk level. @@ -130,8 +136,8 @@ where trace_id = span.trace_id; // Root span: either no parent in this chunk, or tagged _dd.top_level=1 (remote parent). - let is_root = span.parent_id == 0 - || span.metrics.get("_dd.top_level").copied().unwrap_or(0.0) == 1.0; + let is_root = + span.parent_id == 0 || span.metrics.get("_dd.top_level").copied().unwrap_or(0.0) == 1.0; if is_root { if let Some(v) = span.metrics.get("_sampling_priority_v1") { @@ -435,8 +441,14 @@ mod tests { #[test] fn test_payload_promoted_fields() { let mut meta = HashMap::new(); - meta.insert(BytesString::from_static("env"), BytesString::from_static("prod")); - meta.insert(BytesString::from_static("version"), BytesString::from_static("1.2.3")); + meta.insert( + BytesString::from_static("env"), + BytesString::from_static("prod"), + ); + meta.insert( + BytesString::from_static("version"), + BytesString::from_static("1.2.3"), + ); meta.insert( BytesString::from_static("_dd.hostname"), BytesString::from_static("my-host"), From 9122f365324baf840f8fd9973e16073e383ed9d5 Mon Sep 17 00:00:00 2001 From: Anais Raison Date: Mon, 4 May 2026 14:13:22 +0200 Subject: [PATCH 05/13] fix --- .../src/trace_exporter/trace_serializer.rs | 14 +--- .../src/msgpack_encoder/v1/mod.rs | 39 +++++++--- .../src/msgpack_encoder/v1/span_v04.rs | 76 ++++++++++++++++--- 3 files changed, 95 insertions(+), 34 deletions(-) diff --git a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs index 2d1c3fcc2d..287e6207ec 100644 --- a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs +++ b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs @@ -68,19 +68,7 @@ impl<'a> TraceSerializer<'a> { traces: Vec>>, ) -> Result, TraceExporterError> { match self.output_format { - TraceExporterOutputFormat::V1 => { - // For V1, collect as V04 spans and wrap in the V1 variant so the - // serializer knows to use the V1 msgpack encoder. - let chunks = trace_utils::collect_trace_chunks(traces, false).map_err(|e| { - TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string())) - })?; - match chunks { - tracer_payload::TraceChunks::V04(traces) => { - Ok(tracer_payload::TraceChunks::V1(traces)) - } - _ => unreachable!(), - } - } + TraceExporterOutputFormat::V1 => Ok(tracer_payload::TraceChunks::V1(traces)), format => { let use_v05_format = matches!(format, TraceExporterOutputFormat::V05); trace_utils::collect_trace_chunks(traces, use_v05_format).map_err(|e| { diff --git a/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs b/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs index a131f569d7..2bf88804a3 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs @@ -140,20 +140,35 @@ where span.parent_id == 0 || span.metrics.get("_dd.top_level").copied().unwrap_or(0.0) == 1.0; if is_root { - if let Some(v) = span.metrics.get("_sampling_priority_v1") { - sampling_priority = Some(*v as i32); - } - if let Some(v) = span.meta.get("_dd.origin") { - origin = Some(v.borrow()); - } - // _dd.p.dm is a signed integer sampling mechanism code stored as a string. - if let Some(v) = span.meta.get("_dd.p.dm") { - if let Ok(dm) = v.borrow().parse::() { - sampling_mechanism = Some(dm as u32); - } - } + // Root span is authoritative: its values supersede any non-root fallback, + // including absence (a field missing on the root should not be filled from non-roots). + sampling_priority = span.metrics.get("_sampling_priority_v1").map(|v| *v as i32); + origin = span.meta.get("_dd.origin").map(|v| v.borrow()); + // _dd.p.dm is a signed integer stored as a string; unsigned_abs preserves the + // magnitude. + sampling_mechanism = span + .meta + .get("_dd.p.dm") + .and_then(|v| v.borrow().parse::().ok()) + .map(|dm| dm.unsigned_abs()); break; } + + // No root found yet — accumulate fallback values from non-root spans (partial flush). + // Root span values will override these if a root is eventually encountered. + if sampling_priority.is_none() { + sampling_priority = span.metrics.get("_sampling_priority_v1").map(|v| *v as i32); + } + if origin.is_none() { + origin = span.meta.get("_dd.origin").map(|v| v.borrow()); + } + if sampling_mechanism.is_none() { + sampling_mechanism = span + .meta + .get("_dd.p.dm") + .and_then(|v| v.borrow().parse::().ok()) + .map(|dm| dm.unsigned_abs()); + } } ChunkAttrs { diff --git a/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs b/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs index dca475d758..6b3a0f9040 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs @@ -26,6 +26,10 @@ pub(super) enum SpanKey { Type = 10, SpanLinks = 11, SpanEvents = 12, + Env = 13, + Version = 14, + Component = 15, + Kind = 16, } /// Integer keys for V1 span link fields. @@ -56,6 +60,18 @@ pub(super) enum AnyValueKey { Bytes = 5, } +/// Maps the `span.kind` string tag (from v0.4 meta) to the OTEL SpanKind uint32. +fn span_kind_from_str(s: &str) -> Option { + match s { + "internal" => Some(1), + "server" => Some(2), + "client" => Some(3), + "producer" => Some(4), + "consumer" => Some(5), + _ => None, + } +} + /// Encodes span links into the V1 format. /// /// Uses integer keys and string interning for string values. Each span link's @@ -98,7 +114,7 @@ pub fn encode_span_links( if !link.attributes.is_empty() { write_uint8(writer, SpanLinkKey::Attributes as u8)?; - rmp::encode::write_map_len(writer, link.attributes.len() as u32)?; + rmp::encode::write_array_len(writer, link.attributes.len() as u32 * 3)?; for (k, v) in link.attributes.iter() { table.write_interned(writer, k.borrow())?; write_uint8(writer, AnyValueKey::String as u8)?; @@ -147,7 +163,7 @@ fn encode_span_event_attributes( event: &SpanEvent, table: &mut StringTable, ) -> Result<(), ValueWriteError> { - rmp::encode::write_map_len(writer, event.attributes.len() as u32)?; + rmp::encode::write_array_len(writer, event.attributes.len() as u32 * 3)?; for (k, attribute) in event.attributes.iter() { table.write_interned(writer, k.borrow())?; encode_attribute_any_value(writer, attribute, table)?; @@ -205,18 +221,35 @@ fn encode_attribute_any_value( /// /// Key differences from v0.4: /// - Uses integer keys for all fields. -/// - `meta` and `metrics` are combined into a single `attributes` map with type-tagged values. +/// - `meta` and `metrics` are combined into a single `attributes` array (encoded as flat triplets: +/// key, type, value) with type-tagged values. Promoted meta fields are excluded. /// - `meta_struct` bytes are included in `attributes` as `Bytes` values. /// - `trace_id` is not encoded in the span (it belongs to the chunk). /// - `error` is encoded as a boolean. +/// - `env`, `version`, `component`, `span.kind` are promoted from meta to dedicated span fields. /// - String values use streaming string interning via `StringTable`. pub fn encode_span( writer: &mut W, span: &Span, table: &mut StringTable, ) -> Result<(), ValueWriteError> { - let has_attributes = - !span.meta.is_empty() || !span.metrics.is_empty() || !span.meta_struct.is_empty(); + // Extract promoted fields from meta — these get dedicated span-level keys and must + // not appear in the attributes array. + let env = span.meta.get("env").map(|v| v.borrow()); + let version = span.meta.get("version").map(|v| v.borrow()); + let component = span.meta.get("component").map(|v| v.borrow()); + let kind = span + .meta + .get("span.kind") + .and_then(|v| span_kind_from_str(v.borrow())); + + let is_promoted = + |k: &T::Text| matches!(k.borrow(), "env" | "version" | "component" | "span.kind"); + + let non_promoted_meta = span.meta.iter().filter(|(k, _)| !is_promoted(k)).count() as u32; + let attr_count = non_promoted_meta + span.metrics.len() as u32 + span.meta_struct.len() as u32; + let has_attributes = attr_count > 0; + let span_len = 2 // span_id, start — always present + (!span.service.borrow().is_empty()) as u32 + (!span.name.borrow().is_empty()) as u32 @@ -227,7 +260,11 @@ pub fn encode_span( + (span.error != 0) as u32 + has_attributes as u32 + (!span.span_links.is_empty()) as u32 - + (!span.span_events.is_empty()) as u32; + + (!span.span_events.is_empty()) as u32 + + env.is_some() as u32 + + version.is_some() as u32 + + component.is_some() as u32 + + kind.is_some() as u32; rmp::encode::write_map_len(writer, span_len)?; @@ -273,12 +310,15 @@ pub fn encode_span( } if has_attributes { - let attr_count = - span.meta.len() as u32 + span.metrics.len() as u32 + span.meta_struct.len() as u32; + // Attributes are encoded as a flat array of triplets: [key, type, value, ...]. + // Length is 3× the number of key-value pairs (per V1 spec). write_uint8(writer, SpanKey::Attributes as u8)?; - rmp::encode::write_map_len(writer, attr_count)?; + rmp::encode::write_array_len(writer, attr_count * 3)?; for (k, v) in span.meta.iter() { + if is_promoted(k) { + continue; + } table.write_interned(writer, k.borrow())?; write_uint8(writer, AnyValueKey::String as u8)?; table.write_interned(writer, v.borrow())?; @@ -305,5 +345,23 @@ pub fn encode_span( encode_span_events(writer, &span.span_events, table)?; } + // Promoted span-level fields (env, version, component, span.kind → kind uint32). + if let Some(v) = env { + write_uint8(writer, SpanKey::Env as u8)?; + table.write_interned(writer, v)?; + } + if let Some(v) = version { + write_uint8(writer, SpanKey::Version as u8)?; + table.write_interned(writer, v)?; + } + if let Some(v) = component { + write_uint8(writer, SpanKey::Component as u8)?; + table.write_interned(writer, v)?; + } + if let Some(k) = kind { + write_uint8(writer, SpanKey::Kind as u8)?; + write_uint(writer, k as u64)?; + } + Ok(()) } From b865b386d1844731d89fac367409b5314fb8c4a5 Mon Sep 17 00:00:00 2001 From: Anais Raison Date: Mon, 4 May 2026 17:02:36 +0200 Subject: [PATCH 06/13] feat: add integration test --- ...xporter_v04_to_v1_trace_snapshot_test.json | 51 ++++++++ .../tests/test_trace_exporter.rs | 74 ++++++++++++ .../src/msgpack_encoder/v1/mod.rs | 111 +++++++++++++++++- .../src/msgpack_encoder/v1/span_v04.rs | 8 +- .../src/test_utils/datadog_test_agent.rs | 8 +- 5 files changed, 245 insertions(+), 7 deletions(-) create mode 100644 libdd-data-pipeline/tests/snapshots/compare_exporter_v04_to_v1_trace_snapshot_test.json diff --git a/libdd-data-pipeline/tests/snapshots/compare_exporter_v04_to_v1_trace_snapshot_test.json b/libdd-data-pipeline/tests/snapshots/compare_exporter_v04_to_v1_trace_snapshot_test.json new file mode 100644 index 0000000000..ae73ca83b3 --- /dev/null +++ b/libdd-data-pipeline/tests/snapshots/compare_exporter_v04_to_v1_trace_snapshot_test.json @@ -0,0 +1,51 @@ +[[ + { + "name": "test_exporter_v04_v1_snapshot_root", + "service": "test-service", + "resource": "test-resource", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "web", + "meta": { + "_dd.hostname": "my-host", + "_dd.origin": "lambda", + "_dd.p.dm": "-4", + "_dd.p.tid": "0x0", + "component": "http", + "env": "test-env", + "runtime-id": "test-runtime-id-value", + "service": "test-service", + "span.kind": "server", + "version": "1.0.0" + }, + "metrics": { + "_dd.top_level": 1.0, + "_sampling_priority_v1": 1 + }, + "duration": 5, + "start": 0 + }, + { + "name": "test_exporter_v04_v1_snapshot_child", + "service": "test-service", + "resource": "test-resource", + "trace_id": 0, + "span_id": 2, + "parent_id": 1, + "meta": { + "_dd.origin": "lambda", + "_dd.p.dm": "-4", + "_dd.p.tid": "0x0", + "env": "test-env", + "runtime-id": "test-runtime-id-value", + "service": "test-service" + }, + "metrics": { + "_dd_metric1": 1.0, + "_dd_metric2": 2.0, + "_sampling_priority_v1": 1 + }, + "duration": 5, + "start": 1 + }]] diff --git a/libdd-data-pipeline/tests/test_trace_exporter.rs b/libdd-data-pipeline/tests/test_trace_exporter.rs index fe318d4654..ed6ba60465 100644 --- a/libdd-data-pipeline/tests/test_trace_exporter.rs +++ b/libdd-data-pipeline/tests/test_trace_exporter.rs @@ -250,6 +250,80 @@ mod tracing_integration_tests { test_agent.assert_snapshot(snapshot_name).await; } + fn get_v04_to_v1_trace_snapshot_test_payload(name_prefix: &str) -> Vec { + // Root span: exercises chunk-level attrs (sampling priority, origin, mechanism) + // and span-level promoted fields (env, version, component, span.kind). + let mut root_span = create_test_json_span(1234, 12341, 0, 0, true); + root_span["name"] = json!(format!("{name_prefix}_root")); + root_span["type"] = json!("web"); + root_span["meta"] = json!({ + "env": "test-env", + "version": "1.0.0", + "component": "http", + "span.kind": "server", + "_dd.hostname": "my-host", + "_dd.origin": "lambda", + "_dd.p.dm": "-4", + "runtime-id": "test-runtime-id-value", + "service": "test-service", + }); + root_span["metrics"] = json!({ + "_sampling_priority_v1": 1.0, + "_dd.top_level": 1.0, + }); + + // Child span: exercises metrics and meta attributes without promoted fields. + let mut child_span = create_test_json_span(1234, 12342, 12341, 1, false); + child_span["name"] = json!(format!("{name_prefix}_child")); + child_span["metrics"] = json!({ + "_dd_metric1": 1.0, + "_dd_metric2": 2.0, + }); + + rmp_serde::to_vec_named(&vec![vec![root_span, child_span]]).unwrap() + } + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn compare_v04_to_v1_trace_snapshot_test() { + let relative_snapshot_path = "libdd-data-pipeline/tests/snapshots/"; + let snapshot_name = "compare_exporter_v04_to_v1_trace_snapshot_test"; + let test_agent = DatadogTestAgent::new(Some(relative_snapshot_path), None, &[]).await; + let url = test_agent.get_base_uri().await; + + test_agent.start_session(snapshot_name, None).await; + + let task_result = task::spawn_blocking(move || { + let mut builder = TraceExporter::::builder(); + builder + .set_url(url.to_string().as_ref()) + .set_language("test-lang") + .set_language_version("2.0") + .set_language_interpreter_vendor("vendor") + .set_language_interpreter("interpreter") + .set_tracer_version("1.0") + .set_env("test_env") + .set_service("test") + .set_test_session_token(snapshot_name) + .set_input_format(TraceExporterInputFormat::V04) + .set_output_format(TraceExporterOutputFormat::V1); + + let trace_exporter = builder + .build::() + .expect("Unable to build TraceExporter"); + + let data = get_v04_to_v1_trace_snapshot_test_payload("test_exporter_v04_v1_snapshot"); + + let response = trace_exporter.send(data.as_ref()); + assert!(response.is_ok(), "send failed: {:?}", response.err()); + }) + .await; + + assert!(task_result.is_ok()); + + test_agent.assert_snapshot(snapshot_name).await; + } + #[cfg_attr(miri, ignore)] #[cfg(target_os = "linux")] #[tokio::test] diff --git a/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs b/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs index 2bf88804a3..07184b4914 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs @@ -17,6 +17,8 @@ mod trace_key { pub const ENV_REF: u8 = 7; pub const HOSTNAME_REF: u8 = 8; pub const APP_VERSION_REF: u8 = 9; + /// Payload-level attributes map (e.g. `_dd.apm_mode`, `_dd.git.commit.sha`). + pub const ATTRIBUTES: u8 = 10; pub const CHUNKS: u8 = 11; } @@ -75,6 +77,10 @@ struct PayloadAttrs<'a> { env: Option<&'a str>, hostname: Option<&'a str>, app_version: Option<&'a str>, + /// `_dd.apm_mode` span tag, promoted to payload-level attributes. + apm_mode: Option<&'a str>, + /// `_dd.git.commit.sha` span tag, promoted to payload-level attributes. + git_commit_sha: Option<&'a str>, } fn extract_payload_attrs<'a, T: TraceData + 'a, S: AsRef<[Span]>>( @@ -86,6 +92,8 @@ where let mut env = None; let mut hostname = None; let mut app_version = None; + let mut apm_mode = None; + let mut git_commit_sha = None; 'outer: for trace in traces { for span in trace.as_ref() { @@ -98,7 +106,18 @@ where if app_version.is_none() { app_version = span.meta.get("version").map(|v| v.borrow()); } - if env.is_some() && hostname.is_some() && app_version.is_some() { + if apm_mode.is_none() { + apm_mode = span.meta.get("_dd.apm_mode").map(|v| v.borrow()); + } + if git_commit_sha.is_none() { + git_commit_sha = span.meta.get("_dd.git.commit.sha").map(|v| v.borrow()); + } + if env.is_some() + && hostname.is_some() + && app_version.is_some() + && apm_mode.is_some() + && git_commit_sha.is_some() + { break 'outer; } } @@ -108,6 +127,8 @@ where env, hostname, app_version, + apm_mode, + git_commit_sha, } } @@ -187,6 +208,7 @@ where /// trace_key::ENV_REF (7) → str|uint // optional, interned /// trace_key::HOSTNAME_REF (8) → str|uint // optional, interned /// trace_key::APP_VERSION (9) → str|uint // optional, interned +/// trace_key::ATTRIBUTES (10) → Array[...] // optional, flat triplets: key, type, value /// trace_key::CHUNKS (11) → Array[Chunk, ...] /// } /// ``` @@ -197,10 +219,15 @@ fn encode_payload]>>( let mut table = StringTable::new(); let payload_attrs = extract_payload_attrs(traces); + let attr_count = + payload_attrs.apm_mode.is_some() as u32 + payload_attrs.git_commit_sha.is_some() as u32; + let has_attributes = attr_count > 0; + let map_len = 1u32 // chunks always present + payload_attrs.env.is_some() as u32 + payload_attrs.hostname.is_some() as u32 - + payload_attrs.app_version.is_some() as u32; + + payload_attrs.app_version.is_some() as u32 + + has_attributes as u32; write_map_len(writer, map_len)?; @@ -219,6 +246,23 @@ fn encode_payload]>>( table.write_interned(writer, app_version)?; } + if has_attributes { + // Encoded as a flat array of triplets: [key, type_uint, value, ...] + // String values use type discriminant 1. + write_uint8(writer, trace_key::ATTRIBUTES)?; + write_array_len(writer, attr_count * 3)?; + if let Some(v) = payload_attrs.apm_mode { + table.write_interned(writer, "_dd.apm_mode")?; + write_uint8(writer, span_v04::AnyValueKey::String as u8)?; + table.write_interned(writer, v)?; + } + if let Some(v) = payload_attrs.git_commit_sha { + table.write_interned(writer, "_dd.git.commit.sha")?; + write_uint8(writer, span_v04::AnyValueKey::String as u8)?; + table.write_interned(writer, v)?; + } + } + write_uint8(writer, trace_key::CHUNKS)?; write_array_len(writer, traces.len() as u32)?; for trace in traces { @@ -494,4 +538,67 @@ mod tests { "hostname 'my-host' should appear in payload" ); } + + #[test] + fn test_payload_attributes_apm_mode_and_git_commit_sha() { + let mut meta = HashMap::new(); + meta.insert( + BytesString::from_static("_dd.apm_mode"), + BytesString::from_static("ssi"), + ); + meta.insert( + BytesString::from_static("_dd.git.commit.sha"), + BytesString::from_static("abc123"), + ); + + let span = SpanBytes { + service: BytesString::from_slice(b"svc").unwrap(), + name: BytesString::from_slice(b"op").unwrap(), + resource: BytesString::from_slice(b"res").unwrap(), + trace_id: 1, + span_id: 1, + parent_id: 0, + start: 1000, + duration: 100, + meta, + ..Default::default() + }; + + let encoded = to_vec(&[vec![span]]); + + // Both attribute strings must appear in the payload bytes. + let ssi_bytes = b"ssi"; + assert!( + encoded.windows(ssi_bytes.len()).any(|w| w == ssi_bytes), + "apm_mode 'ssi' should appear in payload" + ); + let sha_bytes = b"abc123"; + assert!( + encoded.windows(sha_bytes.len()).any(|w| w == sha_bytes), + "git commit sha 'abc123' should appear in payload" + ); + // The attribute key names must also be present (first occurrence is a raw str). + let apm_key = b"_dd.apm_mode"; + assert!( + encoded.windows(apm_key.len()).any(|w| w == apm_key), + "_dd.apm_mode key should appear in payload" + ); + let git_key = b"_dd.git.commit.sha"; + assert!( + encoded.windows(git_key.len()).any(|w| w == git_key), + "_dd.git.commit.sha key should appear in payload" + ); + } + + #[test] + fn test_payload_attributes_absent_when_no_relevant_tags() { + // A span with no _dd.apm_mode or _dd.git.commit.sha must not produce key 10. + let span = make_span("svc", "op", 1, 1, 0); + let encoded = to_vec(&[vec![span]]); + let apm_key = b"_dd.apm_mode"; + assert!( + !encoded.windows(apm_key.len()).any(|w| w == apm_key), + "key 10 should be absent when no relevant tags are set" + ); + } } diff --git a/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs b/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs index 6b3a0f9040..edd02876db 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs @@ -4,8 +4,8 @@ use crate::span::v04::{AttributeAnyValue, AttributeArrayValue, Span, SpanEvent, SpanLink}; use crate::span::TraceData; use rmp::encode::{ - write_bin, write_bool, write_f64, write_i64, write_sint, write_u64, write_uint, write_uint8, - RmpWrite, ValueWriteError, + write_bin, write_bool, write_f64, write_sint, write_u64, write_uint, write_uint8, RmpWrite, + ValueWriteError, }; use std::borrow::Borrow; @@ -287,7 +287,7 @@ pub fn encode_span( write_u64(writer, span.span_id)?; write_uint8(writer, SpanKey::Start as u8)?; - write_i64(writer, span.start)?; + write_u64(writer, span.start as u64)?; if span.parent_id != 0 { write_uint8(writer, SpanKey::ParentId as u8)?; @@ -296,7 +296,7 @@ pub fn encode_span( if span.duration != 0 { write_uint8(writer, SpanKey::Duration as u8)?; - write_i64(writer, span.duration)?; + write_u64(writer, span.duration as u64)?; } if span.error != 0 { diff --git a/libdd-trace-utils/src/test_utils/datadog_test_agent.rs b/libdd-trace-utils/src/test_utils/datadog_test_agent.rs index 9710700b05..d8580e4509 100644 --- a/libdd-trace-utils/src/test_utils/datadog_test_agent.rs +++ b/libdd-trace-utils/src/test_utils/datadog_test_agent.rs @@ -14,7 +14,7 @@ use std::str::FromStr; use std::time::Duration; const TEST_AGENT_IMAGE_NAME: &str = "ghcr.io/datadog/dd-apm-test-agent/ddapm-test-agent"; -const TEST_AGENT_IMAGE_TAG: &str = "v1.39.0"; +const TEST_AGENT_IMAGE_TAG: &str = "v1.52.0"; const TEST_AGENT_READY_MSG: &str = "INFO:ddapm_test_agent.agent:Trace request stall seconds setting set to 0.0."; @@ -167,6 +167,12 @@ impl DatadogAgentContainerBuilder { )); } + // The image default is SNAPSHOT_CI=1 (CI mode). Override with the host's CI setting so + // local runs (CI unset → "0") can auto-generate missing snapshot files while CI pipelines + // (CI=1) validate against committed ones. + let snapshot_ci = std::env::var("CI").unwrap_or_else(|_| "0".to_string()); + env_vars.push(("SNAPSHOT_CI".to_string(), snapshot_ci)); + DatadogAgentContainerBuilder { mounts, env_vars, From 29cab17b7caf0e96bec3cc0fceb81268c3da7481 Mon Sep 17 00:00:00 2001 From: Anais Raison Date: Tue, 5 May 2026 15:02:53 +0200 Subject: [PATCH 07/13] fix: address lasts comments --- libdd-data-pipeline/src/trace_exporter/mod.rs | 1 + .../src/trace_exporter/trace_serializer.rs | 17 ++- libdd-trace-utils/src/msgpack_encoder/mod.rs | 1 + .../src/msgpack_encoder/v1/mod.rs | 119 +++++++++++++++--- .../src/msgpack_encoder/v1/span_v04.rs | 35 +++--- libdd-trace-utils/src/send_data/mod.rs | 1 + libdd-trace-utils/src/tracer_header_tags.rs | 3 + 7 files changed, 141 insertions(+), 36 deletions(-) diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index cde970d360..eef6c22df8 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -154,6 +154,7 @@ impl<'a> From<&'a TracerMetadata> for TracerHeaderTags<'a> { tracer_version: &tags.tracer_version, lang_interpreter: &tags.language_interpreter, lang_vendor: &tags.language_interpreter_vendor, + runtime_id: &tags.runtime_id, client_computed_stats: tags.client_computed_stats, client_computed_top_level: tags.client_computed_top_level, ..Default::default() diff --git a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs index 287e6207ec..a42d796b28 100644 --- a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs +++ b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs @@ -52,8 +52,8 @@ impl<'a> TraceSerializer<'a> { ) -> Result { let payload = self.collect_and_process_traces(traces)?; let chunks = payload.size(); + let mp_payload = self.serialize_payload(&payload, &header_tags)?; let headers = self.build_traces_headers(header_tags, chunks); - let mp_payload = self.serialize_payload(&payload)?; Ok(PreparedTracesPayload { data: mp_payload, @@ -97,13 +97,22 @@ impl<'a> TraceSerializer<'a> { fn serialize_payload( &self, payload: &tracer_payload::TraceChunks, + header_tags: &TracerHeaderTags, ) -> Result, TraceExporterError> { match payload { tracer_payload::TraceChunks::V04(p) => Ok(msgpack_encoder::v04::to_vec(p)), tracer_payload::TraceChunks::V05(p) => { rmp_serde::to_vec(p).map_err(TraceExporterError::Serialization) } - tracer_payload::TraceChunks::V1(p) => Ok(msgpack_encoder::v1::to_vec(p)), + tracer_payload::TraceChunks::V1(p) => { + let metadata = msgpack_encoder::v1::PayloadMetadata { + language_name: header_tags.lang, + language_version: header_tags.lang_version, + tracer_version: header_tags.tracer_version, + runtime_id: header_tags.runtime_id, + }; + Ok(msgpack_encoder::v1::to_vec(p, &metadata)) + } } } } @@ -259,7 +268,7 @@ mod tests { .collect_and_process_traces(original_traces.clone()) .unwrap(); - let result = serializer.serialize_payload(&payload); + let result = serializer.serialize_payload(&payload, &TracerHeaderTags::default()); assert!(result.is_ok()); let serialized = result.unwrap(); @@ -294,7 +303,7 @@ mod tests { .collect_and_process_traces(original_traces.clone()) .unwrap(); - let result = serializer.serialize_payload(&payload); + let result = serializer.serialize_payload(&payload, &TracerHeaderTags::default()); assert!(result.is_ok()); let serialized = result.unwrap(); diff --git a/libdd-trace-utils/src/msgpack_encoder/mod.rs b/libdd-trace-utils/src/msgpack_encoder/mod.rs index d8c9a11d3e..06898a33eb 100644 --- a/libdd-trace-utils/src/msgpack_encoder/mod.rs +++ b/libdd-trace-utils/src/msgpack_encoder/mod.rs @@ -4,6 +4,7 @@ pub mod v04; pub mod v1; +/// A writer that counts bytes without storing them, used to compute encoded payload size. pub(crate) struct CountLength(u32); impl std::io::Write for CountLength { diff --git a/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs b/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs index 07184b4914..aa935d47a4 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs @@ -14,6 +14,10 @@ use std::collections::HashMap; /// Integer keys for the top-level V1 trace payload map. mod trace_key { + pub const LANGUAGE_NAME: u8 = 3; + pub const LANGUAGE_VERSION: u8 = 4; + pub const TRACER_VERSION: u8 = 5; + pub const RUNTIME_ID: u8 = 6; pub const ENV_REF: u8 = 7; pub const HOSTNAME_REF: u8 = 8; pub const APP_VERSION_REF: u8 = 9; @@ -72,6 +76,18 @@ impl StringTable { } } +/// Tracer-level metadata encoded at the top of the V1 payload (keys 3–6). +/// +/// These fields are required by the V1 spec: tracers must include them in the payload +/// (not only in HTTP headers). +#[derive(Default)] +pub struct PayloadMetadata<'a> { + pub language_name: &'a str, + pub language_version: &'a str, + pub tracer_version: &'a str, + pub runtime_id: &'a str, +} + /// Promoted fields extracted from the payload's spans, written at the top-level map. struct PayloadAttrs<'a> { env: Option<&'a str>, @@ -215,6 +231,7 @@ where fn encode_payload]>>( writer: &mut W, traces: &[S], + metadata: &PayloadMetadata, ) -> Result<(), ValueWriteError> { let mut table = StringTable::new(); let payload_attrs = extract_payload_attrs(traces); @@ -224,6 +241,10 @@ fn encode_payload]>>( let has_attributes = attr_count > 0; let map_len = 1u32 // chunks always present + + (!metadata.language_name.is_empty()) as u32 + + (!metadata.language_version.is_empty()) as u32 + + (!metadata.tracer_version.is_empty()) as u32 + + (!metadata.runtime_id.is_empty()) as u32 + payload_attrs.env.is_some() as u32 + payload_attrs.hostname.is_some() as u32 + payload_attrs.app_version.is_some() as u32 @@ -231,6 +252,26 @@ fn encode_payload]>>( write_map_len(writer, map_len)?; + if !metadata.language_name.is_empty() { + write_uint8(writer, trace_key::LANGUAGE_NAME)?; + table.write_interned(writer, metadata.language_name)?; + } + + if !metadata.language_version.is_empty() { + write_uint8(writer, trace_key::LANGUAGE_VERSION)?; + table.write_interned(writer, metadata.language_version)?; + } + + if !metadata.tracer_version.is_empty() { + write_uint8(writer, trace_key::TRACER_VERSION)?; + table.write_interned(writer, metadata.tracer_version)?; + } + + if !metadata.runtime_id.is_empty() { + write_uint8(writer, trace_key::RUNTIME_ID)?; + table.write_interned(writer, metadata.runtime_id)?; + } + if let Some(env) = payload_attrs.env { write_uint8(writer, trace_key::ENV_REF)?; table.write_interned(writer, env)?; @@ -332,29 +373,37 @@ pub fn write_to_slice]>>( // &mut &mut [u8] lets the caller see the slice shrink as bytes are written. slice: &mut &mut [u8], traces: &[S], + metadata: &PayloadMetadata, ) -> Result<(), ValueWriteError> { - encode_payload(slice, traces) + encode_payload(slice, traces, metadata) } /// Serializes traces into a `Vec` using the V1 msgpack format. -pub fn to_vec]>>(traces: &[S]) -> Vec { - to_vec_with_capacity(traces, 0) +pub fn to_vec]>>( + traces: &[S], + metadata: &PayloadMetadata, +) -> Vec { + to_vec_with_capacity(traces, 0, metadata) } /// Serializes traces into a `Vec` with a pre-allocated capacity. pub fn to_vec_with_capacity]>>( traces: &[S], capacity: u32, + metadata: &PayloadMetadata, ) -> Vec { let mut buf = ByteBuf::with_capacity(capacity as usize); - let _ = encode_payload(&mut buf, traces); + let _ = encode_payload(&mut buf, traces, metadata); // infallible: ByteBuf write never fails buf.into_vec() } /// Returns the number of bytes the V1 payload for `traces` would occupy. -pub fn to_encoded_byte_len]>>(traces: &[S]) -> u32 { +pub fn to_encoded_byte_len]>>( + traces: &[S], + metadata: &PayloadMetadata, +) -> u32 { let mut counter = super::CountLength(0); - let _ = encode_payload(&mut counter, traces); + let _ = encode_payload(&mut counter, traces, metadata); // infallible: CountLength write never fails counter.0 } @@ -389,14 +438,14 @@ mod tests { fn test_to_vec_non_empty() { let spans = vec![make_span("svc", "op", 42, 1, 0)]; let traces = vec![spans]; - let encoded = to_vec(&traces); + let encoded = to_vec(&traces, &PayloadMetadata::default()); assert!(!encoded.is_empty()); } #[test] fn test_to_vec_empty_traces() { let traces: Vec> = vec![]; - let encoded = to_vec(&traces); + let encoded = to_vec(&traces, &PayloadMetadata::default()); // Must still produce a valid msgpack map with an empty chunks array. assert!(!encoded.is_empty()); } @@ -412,8 +461,8 @@ mod tests { let s_single = make_span("my-service", "op1", 1, 1, 0); let traces_single = vec![vec![s_single]]; - let encoded_two = to_vec(&traces_two); - let encoded_single = to_vec(&traces_single); + let encoded_two = to_vec(&traces_two, &PayloadMetadata::default()); + let encoded_single = to_vec(&traces_single, &PayloadMetadata::default()); // The two-trace payload should be less than 2× the single-trace payload // if interning is working (the second "my-service" is encoded as an integer). @@ -449,7 +498,7 @@ mod tests { ..Default::default() }; - let encoded = to_vec(&[vec![root]]); + let encoded = to_vec(&[vec![root]], &PayloadMetadata::default()); assert!(!encoded.is_empty()); // The payload must contain "lambda" somewhere (the origin string). let lambda_bytes = b"lambda"; @@ -468,8 +517,9 @@ mod tests { make_span("svc", "child", 1, 2, 1), ]; let traces = vec![spans]; - let encoded = to_vec(&traces); - let len = to_encoded_byte_len(&traces); + let meta = PayloadMetadata::default(); + let encoded = to_vec(&traces, &meta); + let len = to_encoded_byte_len(&traces, &meta); assert_eq!(encoded.len() as u32, len); } @@ -493,7 +543,7 @@ mod tests { ..Default::default() }; - let encoded = to_vec(&[vec![root]]); + let encoded = to_vec(&[vec![root]], &PayloadMetadata::default()); assert!(!encoded.is_empty()); } @@ -526,7 +576,7 @@ mod tests { ..Default::default() }; - let encoded = to_vec(&[vec![span]]); + let encoded = to_vec(&[vec![span]], &PayloadMetadata::default()); let prod_bytes = b"prod"; assert!( encoded.windows(prod_bytes.len()).any(|w| w == prod_bytes), @@ -564,7 +614,7 @@ mod tests { ..Default::default() }; - let encoded = to_vec(&[vec![span]]); + let encoded = to_vec(&[vec![span]], &PayloadMetadata::default()); // Both attribute strings must appear in the payload bytes. let ssi_bytes = b"ssi"; @@ -594,11 +644,46 @@ mod tests { fn test_payload_attributes_absent_when_no_relevant_tags() { // A span with no _dd.apm_mode or _dd.git.commit.sha must not produce key 10. let span = make_span("svc", "op", 1, 1, 0); - let encoded = to_vec(&[vec![span]]); + let encoded = to_vec(&[vec![span]], &PayloadMetadata::default()); let apm_key = b"_dd.apm_mode"; assert!( !encoded.windows(apm_key.len()).any(|w| w == apm_key), "key 10 should be absent when no relevant tags are set" ); } + + #[test] + fn test_payload_metadata_fields_present() { + let span = make_span("svc", "op", 1, 1, 0); + let metadata = PayloadMetadata { + language_name: "python", + language_version: "3.11", + tracer_version: "2.0.0", + runtime_id: "abc-123-uuid", + }; + let encoded = to_vec(&[vec![span]], &metadata); + + for s in &[b"python" as &[u8], b"3.11", b"2.0.0", b"abc-123-uuid"] { + assert!( + encoded.windows(s.len()).any(|w| w == *s), + "{} should appear in payload", + std::str::from_utf8(s).unwrap() + ); + } + } + + #[test] + fn test_payload_metadata_absent_when_empty() { + let span = make_span("svc", "op", 1, 1, 0); + let encoded_with = to_vec( + &[vec![span.clone()]], + &PayloadMetadata { + language_name: "go", + ..Default::default() + }, + ); + let encoded_without = to_vec(&[vec![span]], &PayloadMetadata::default()); + // Payload with metadata must be larger (it carries extra fields). + assert!(encoded_with.len() > encoded_without.len()); + } } diff --git a/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs b/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs index edd02876db..9c60f573da 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs @@ -57,7 +57,12 @@ pub(super) enum AnyValueKey { String = 1, Bool = 2, Double = 3, + Int64 = 4, Bytes = 5, + Array = 6, + /// Not used in V04→V1 conversion (V04 has no key-value list type), defined for completeness. + #[allow(dead_code)] + KeyValueList = 7, } /// Maps the `span.kind` string tag (from v0.4 meta) to the OTEL SpanKind uint32. @@ -102,16 +107,6 @@ pub fn encode_span_links( write_u64(writer, link.span_id)?; } - if !link.tracestate.borrow().is_empty() { - write_uint8(writer, SpanLinkKey::TraceState as u8)?; - table.write_interned(writer, link.tracestate.borrow())?; - } - - if link.flags != 0 { - write_uint8(writer, SpanLinkKey::Flags as u8)?; - write_uint(writer, link.flags as u64)?; - } - if !link.attributes.is_empty() { write_uint8(writer, SpanLinkKey::Attributes as u8)?; rmp::encode::write_array_len(writer, link.attributes.len() as u32 * 3)?; @@ -121,6 +116,16 @@ pub fn encode_span_links( table.write_interned(writer, v.borrow())?; } } + + if !link.tracestate.borrow().is_empty() { + write_uint8(writer, SpanLinkKey::TraceState as u8)?; + table.write_interned(writer, link.tracestate.borrow())?; + } + + if link.flags != 0 { + write_uint8(writer, SpanLinkKey::Flags as u8)?; + write_uint(writer, link.flags as u64)?; + } } Ok(()) @@ -190,14 +195,14 @@ fn encode_attribute_any_value( write_uint8(writer, AnyValueKey::Bool as u8)?; write_bool(writer, *b).map_err(ValueWriteError::InvalidDataWrite)?; } - AttributeArrayValue::Integer(i) => { - write_uint8(writer, 4u8)?; // Int64 - write_sint(writer, *i)?; - } AttributeArrayValue::Double(d) => { write_uint8(writer, AnyValueKey::Double as u8)?; write_f64(writer, *d)?; } + AttributeArrayValue::Integer(i) => { + write_uint8(writer, AnyValueKey::Int64 as u8)?; + write_sint(writer, *i)?; + } } Ok(()) } @@ -207,7 +212,7 @@ fn encode_attribute_any_value( encode_array_element(writer, value, table)?; } AttributeAnyValue::Array(array) => { - write_uint8(writer, 6u8)?; // Array + write_uint8(writer, AnyValueKey::Array as u8)?; rmp::encode::write_array_len(writer, array.len() as u32)?; for v in array { encode_array_element(writer, v, table)?; diff --git a/libdd-trace-utils/src/send_data/mod.rs b/libdd-trace-utils/src/send_data/mod.rs index d358406ea0..81300184c2 100644 --- a/libdd-trace-utils/src/send_data/mod.rs +++ b/libdd-trace-utils/src/send_data/mod.rs @@ -475,6 +475,7 @@ mod tests { lang_vendor: "vendor", tracer_version: "1.0", container_id: "id", + runtime_id: "", client_computed_top_level: false, client_computed_stats: false, dropped_p0_traces: 0, diff --git a/libdd-trace-utils/src/tracer_header_tags.rs b/libdd-trace-utils/src/tracer_header_tags.rs index b4a2cda896..636e72a883 100644 --- a/libdd-trace-utils/src/tracer_header_tags.rs +++ b/libdd-trace-utils/src/tracer_header_tags.rs @@ -27,6 +27,7 @@ pub struct TracerHeaderTags<'a> { pub lang_vendor: &'a str, pub tracer_version: &'a str, pub container_id: &'a str, + pub runtime_id: &'a str, // specifies that the client has marked top-level spans, when set. If the header is present // this value will resolve to 'true' pub client_computed_top_level: bool, @@ -167,6 +168,7 @@ mod tests { lang_vendor: "vendor", tracer_version: "1.0", container_id: "id", + runtime_id: "", client_computed_top_level: true, client_computed_stats: true, dropped_p0_traces: 12, @@ -203,6 +205,7 @@ mod tests { lang_vendor: "vendor", tracer_version: "1.0", container_id: "", + runtime_id: "", client_computed_top_level: false, client_computed_stats: false, dropped_p0_spans: 0, From cfd258f2440e46d762e651af16962fa56b101a6b Mon Sep 17 00:00:00 2001 From: Anais Raison Date: Mon, 18 May 2026 14:02:33 +0200 Subject: [PATCH 08/13] fix: apply suggestion --- libdd-data-pipeline/src/trace_exporter/mod.rs | 43 +---------- .../src/trace_exporter/stats.rs | 19 ----- .../src/trace_exporter/trace_serializer.rs | 32 ++++----- libdd-trace-stats/src/stats_exporter.rs | 20 ++++++ libdd-trace-utils/src/lib.rs | 1 + .../src/msgpack_encoder/v1/mod.rs | 72 ++++++++----------- libdd-trace-utils/src/send_data/mod.rs | 1 - libdd-trace-utils/src/tracer_header_tags.rs | 3 - libdd-trace-utils/src/tracer_metadata.rs | 44 ++++++++++++ 9 files changed, 114 insertions(+), 121 deletions(-) create mode 100644 libdd-trace-utils/src/tracer_metadata.rs diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index eef6c22df8..27420abc48 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -128,45 +128,7 @@ fn add_path(url: &Uri, path: &str) -> Uri { Uri::from_parts(parts).unwrap() } -#[derive(Clone, Default, Debug)] -pub struct TracerMetadata { - pub hostname: String, - pub env: String, - pub app_version: String, - pub runtime_id: String, - pub service: String, - pub tracer_version: String, - pub language: String, - pub language_version: String, - pub language_interpreter: String, - pub language_interpreter_vendor: String, - pub git_commit_sha: String, - pub process_tags: String, - pub client_computed_stats: bool, - pub client_computed_top_level: bool, -} - -impl<'a> From<&'a TracerMetadata> for TracerHeaderTags<'a> { - fn from(tags: &'a TracerMetadata) -> TracerHeaderTags<'a> { - TracerHeaderTags::<'_> { - lang: &tags.language, - lang_version: &tags.language_version, - tracer_version: &tags.tracer_version, - lang_interpreter: &tags.language_interpreter, - lang_vendor: &tags.language_interpreter_vendor, - runtime_id: &tags.runtime_id, - client_computed_stats: tags.client_computed_stats, - client_computed_top_level: tags.client_computed_top_level, - ..Default::default() - } - } -} - -impl<'a> From<&'a TracerMetadata> for HeaderMap { - fn from(tags: &'a TracerMetadata) -> HeaderMap { - TracerHeaderTags::from(tags).into() - } -} +pub use libdd_trace_utils::tracer_metadata::TracerMetadata; /// Handles for the background workers owned by a [`TraceExporter`]. #[cfg(not(target_arch = "wasm32"))] @@ -623,7 +585,8 @@ impl TraceExporter { self.output_format, self.agent_payload_response_version.as_ref(), ); - let prepared = match serializer.prepare_traces_payload(traces, header_tags) { + let prepared = match serializer.prepare_traces_payload(traces, header_tags, &self.metadata) + { Ok(p) => p, Err(e) => { error!("Error serializing traces: {e}"); diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 1fbaf82d0d..4220321c41 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -257,22 +257,3 @@ pub(crate) fn is_stats_worker_active(client_side_stats: &ArcSwap for StatsMetadata { - fn from(m: TracerMetadata) -> StatsMetadata { - StatsMetadata { - hostname: m.hostname, - env: m.env, - app_version: m.app_version, - runtime_id: m.runtime_id, - language: m.language, - lang_version: m.language_version, - lang_interpreter: m.language_interpreter, - lang_vendor: m.language_interpreter_vendor, - tracer_version: m.tracer_version, - git_commit_sha: m.git_commit_sha, - process_tags: m.process_tags, - service: m.service, - } - } -} diff --git a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs index a42d796b28..4f57c5c8dc 100644 --- a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs +++ b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs @@ -14,6 +14,7 @@ use libdd_trace_utils::msgpack_decoder::decode::error::DecodeError; use libdd_trace_utils::msgpack_encoder; use libdd_trace_utils::span::{v04::Span, TraceData}; use libdd_trace_utils::trace_utils::{self, TracerHeaderTags}; +use libdd_trace_utils::tracer_metadata::TracerMetadata; use libdd_trace_utils::tracer_payload; /// Prepared traces payload ready for sending to the agent @@ -49,10 +50,11 @@ impl<'a> TraceSerializer<'a> { &self, traces: Vec>>, header_tags: TracerHeaderTags, + metadata: &TracerMetadata, ) -> Result { let payload = self.collect_and_process_traces(traces)?; let chunks = payload.size(); - let mp_payload = self.serialize_payload(&payload, &header_tags)?; + let mp_payload = self.serialize_payload(&payload, metadata)?; let headers = self.build_traces_headers(header_tags, chunks); Ok(PreparedTracesPayload { @@ -97,22 +99,14 @@ impl<'a> TraceSerializer<'a> { fn serialize_payload( &self, payload: &tracer_payload::TraceChunks, - header_tags: &TracerHeaderTags, + metadata: &TracerMetadata, ) -> Result, TraceExporterError> { match payload { tracer_payload::TraceChunks::V04(p) => Ok(msgpack_encoder::v04::to_vec(p)), tracer_payload::TraceChunks::V05(p) => { rmp_serde::to_vec(p).map_err(TraceExporterError::Serialization) } - tracer_payload::TraceChunks::V1(p) => { - let metadata = msgpack_encoder::v1::PayloadMetadata { - language_name: header_tags.lang, - language_version: header_tags.lang_version, - tracer_version: header_tags.tracer_version, - runtime_id: header_tags.runtime_id, - }; - Ok(msgpack_encoder::v1::to_vec(p, &metadata)) - } + tracer_payload::TraceChunks::V1(p) => Ok(msgpack_encoder::v1::to_vec(p, metadata)), } } } @@ -268,7 +262,7 @@ mod tests { .collect_and_process_traces(original_traces.clone()) .unwrap(); - let result = serializer.serialize_payload(&payload, &TracerHeaderTags::default()); + let result = serializer.serialize_payload(&payload, &TracerMetadata::default()); assert!(result.is_ok()); let serialized = result.unwrap(); @@ -303,7 +297,7 @@ mod tests { .collect_and_process_traces(original_traces.clone()) .unwrap(); - let result = serializer.serialize_payload(&payload, &TracerHeaderTags::default()); + let result = serializer.serialize_payload(&payload, &TracerMetadata::default()); assert!(result.is_ok()); let serialized = result.unwrap(); @@ -339,7 +333,8 @@ mod tests { ]; let header_tags = create_test_header_tags(); - let result = serializer.prepare_traces_payload(traces, header_tags); + let result = + serializer.prepare_traces_payload(traces, header_tags, &TracerMetadata::default()); assert!(result.is_ok()); let prepared = result.unwrap(); @@ -358,7 +353,8 @@ mod tests { let traces = vec![vec![create_test_span()]]; let header_tags = create_test_header_tags(); - let result = serializer.prepare_traces_payload(traces, header_tags); + let result = + serializer.prepare_traces_payload(traces, header_tags, &TracerMetadata::default()); assert!(result.is_ok()); let prepared = result.unwrap(); @@ -374,7 +370,8 @@ mod tests { let traces = vec![vec![create_test_span()]]; let header_tags = create_test_header_tags(); - let result = serializer.prepare_traces_payload(traces, header_tags); + let result = + serializer.prepare_traces_payload(traces, header_tags, &TracerMetadata::default()); assert!(result.is_ok()); let prepared = result.unwrap(); @@ -388,7 +385,8 @@ mod tests { let traces: Vec> = vec![]; let header_tags = create_test_header_tags(); - let result = serializer.prepare_traces_payload(traces, header_tags); + let result = + serializer.prepare_traces_payload(traces, header_tags, &TracerMetadata::default()); assert!(result.is_ok()); let prepared = result.unwrap(); diff --git a/libdd-trace-stats/src/stats_exporter.rs b/libdd-trace-stats/src/stats_exporter.rs index e0130804cd..9f41abedc5 100644 --- a/libdd-trace-stats/src/stats_exporter.rs +++ b/libdd-trace-stats/src/stats_exporter.rs @@ -17,6 +17,7 @@ use libdd_shared_runtime::Worker; use libdd_trace_protobuf::pb; use libdd_trace_utils::send_with_retry::{send_with_retry, RetryStrategy}; use libdd_trace_utils::trace_utils::TracerHeaderTags; +use libdd_trace_utils::tracer_metadata::TracerMetadata; use std::fmt::Debug; use tracing::error; @@ -52,6 +53,25 @@ impl<'a> From<&'a StatsMetadata> for TracerHeaderTags<'a> { } } +impl From for StatsMetadata { + fn from(m: TracerMetadata) -> StatsMetadata { + StatsMetadata { + hostname: m.hostname, + env: m.env, + app_version: m.app_version, + runtime_id: m.runtime_id, + language: m.language, + lang_version: m.language_version, + lang_interpreter: m.language_interpreter, + lang_vendor: m.language_interpreter_vendor, + tracer_version: m.tracer_version, + git_commit_sha: m.git_commit_sha, + process_tags: m.process_tags, + service: m.service, + } + } +} + /// An exporter that concentrates and sends stats to the agent. /// /// `H` is the HTTP client implementation, see [`HttpClientTrait`]. Leaf crates diff --git a/libdd-trace-utils/src/lib.rs b/libdd-trace-utils/src/lib.rs index 5218e30c1a..aa8d93c887 100644 --- a/libdd-trace-utils/src/lib.rs +++ b/libdd-trace-utils/src/lib.rs @@ -18,6 +18,7 @@ pub mod stats_utils; pub mod test_utils; pub mod trace_utils; pub mod tracer_header_tags; +pub mod tracer_metadata; pub mod tracer_payload; pub mod span; diff --git a/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs b/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs index aa935d47a4..8f7bb783f8 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs @@ -5,6 +5,7 @@ mod span_v04; use crate::span::v04::Span; use crate::span::TraceData; +use crate::tracer_metadata::TracerMetadata; use rmp::encode::{ write_array_len, write_bin, write_map_len, write_sint, write_str, write_uint, write_uint8, ByteBuf, RmpWrite, ValueWriteError, @@ -76,18 +77,6 @@ impl StringTable { } } -/// Tracer-level metadata encoded at the top of the V1 payload (keys 3–6). -/// -/// These fields are required by the V1 spec: tracers must include them in the payload -/// (not only in HTTP headers). -#[derive(Default)] -pub struct PayloadMetadata<'a> { - pub language_name: &'a str, - pub language_version: &'a str, - pub tracer_version: &'a str, - pub runtime_id: &'a str, -} - /// Promoted fields extracted from the payload's spans, written at the top-level map. struct PayloadAttrs<'a> { env: Option<&'a str>, @@ -231,7 +220,7 @@ where fn encode_payload]>>( writer: &mut W, traces: &[S], - metadata: &PayloadMetadata, + metadata: &TracerMetadata, ) -> Result<(), ValueWriteError> { let mut table = StringTable::new(); let payload_attrs = extract_payload_attrs(traces); @@ -241,7 +230,7 @@ fn encode_payload]>>( let has_attributes = attr_count > 0; let map_len = 1u32 // chunks always present - + (!metadata.language_name.is_empty()) as u32 + + (!metadata.language.is_empty()) as u32 + (!metadata.language_version.is_empty()) as u32 + (!metadata.tracer_version.is_empty()) as u32 + (!metadata.runtime_id.is_empty()) as u32 @@ -252,24 +241,24 @@ fn encode_payload]>>( write_map_len(writer, map_len)?; - if !metadata.language_name.is_empty() { + if !metadata.language.is_empty() { write_uint8(writer, trace_key::LANGUAGE_NAME)?; - table.write_interned(writer, metadata.language_name)?; + table.write_interned(writer, &metadata.language)?; } if !metadata.language_version.is_empty() { write_uint8(writer, trace_key::LANGUAGE_VERSION)?; - table.write_interned(writer, metadata.language_version)?; + table.write_interned(writer, &metadata.language_version)?; } if !metadata.tracer_version.is_empty() { write_uint8(writer, trace_key::TRACER_VERSION)?; - table.write_interned(writer, metadata.tracer_version)?; + table.write_interned(writer, &metadata.tracer_version)?; } if !metadata.runtime_id.is_empty() { write_uint8(writer, trace_key::RUNTIME_ID)?; - table.write_interned(writer, metadata.runtime_id)?; + table.write_interned(writer, &metadata.runtime_id)?; } if let Some(env) = payload_attrs.env { @@ -373,7 +362,7 @@ pub fn write_to_slice]>>( // &mut &mut [u8] lets the caller see the slice shrink as bytes are written. slice: &mut &mut [u8], traces: &[S], - metadata: &PayloadMetadata, + metadata: &TracerMetadata, ) -> Result<(), ValueWriteError> { encode_payload(slice, traces, metadata) } @@ -381,7 +370,7 @@ pub fn write_to_slice]>>( /// Serializes traces into a `Vec` using the V1 msgpack format. pub fn to_vec]>>( traces: &[S], - metadata: &PayloadMetadata, + metadata: &TracerMetadata, ) -> Vec { to_vec_with_capacity(traces, 0, metadata) } @@ -390,7 +379,7 @@ pub fn to_vec]>>( pub fn to_vec_with_capacity]>>( traces: &[S], capacity: u32, - metadata: &PayloadMetadata, + metadata: &TracerMetadata, ) -> Vec { let mut buf = ByteBuf::with_capacity(capacity as usize); let _ = encode_payload(&mut buf, traces, metadata); // infallible: ByteBuf write never fails @@ -400,7 +389,7 @@ pub fn to_vec_with_capacity]>>( /// Returns the number of bytes the V1 payload for `traces` would occupy. pub fn to_encoded_byte_len]>>( traces: &[S], - metadata: &PayloadMetadata, + metadata: &TracerMetadata, ) -> u32 { let mut counter = super::CountLength(0); let _ = encode_payload(&mut counter, traces, metadata); // infallible: CountLength write never fails @@ -438,14 +427,14 @@ mod tests { fn test_to_vec_non_empty() { let spans = vec![make_span("svc", "op", 42, 1, 0)]; let traces = vec![spans]; - let encoded = to_vec(&traces, &PayloadMetadata::default()); + let encoded = to_vec(&traces, &TracerMetadata::default()); assert!(!encoded.is_empty()); } #[test] fn test_to_vec_empty_traces() { let traces: Vec> = vec![]; - let encoded = to_vec(&traces, &PayloadMetadata::default()); + let encoded = to_vec(&traces, &TracerMetadata::default()); // Must still produce a valid msgpack map with an empty chunks array. assert!(!encoded.is_empty()); } @@ -461,8 +450,8 @@ mod tests { let s_single = make_span("my-service", "op1", 1, 1, 0); let traces_single = vec![vec![s_single]]; - let encoded_two = to_vec(&traces_two, &PayloadMetadata::default()); - let encoded_single = to_vec(&traces_single, &PayloadMetadata::default()); + let encoded_two = to_vec(&traces_two, &TracerMetadata::default()); + let encoded_single = to_vec(&traces_single, &TracerMetadata::default()); // The two-trace payload should be less than 2× the single-trace payload // if interning is working (the second "my-service" is encoded as an integer). @@ -498,7 +487,7 @@ mod tests { ..Default::default() }; - let encoded = to_vec(&[vec![root]], &PayloadMetadata::default()); + let encoded = to_vec(&[vec![root]], &TracerMetadata::default()); assert!(!encoded.is_empty()); // The payload must contain "lambda" somewhere (the origin string). let lambda_bytes = b"lambda"; @@ -517,7 +506,7 @@ mod tests { make_span("svc", "child", 1, 2, 1), ]; let traces = vec![spans]; - let meta = PayloadMetadata::default(); + let meta = TracerMetadata::default(); let encoded = to_vec(&traces, &meta); let len = to_encoded_byte_len(&traces, &meta); assert_eq!(encoded.len() as u32, len); @@ -543,7 +532,7 @@ mod tests { ..Default::default() }; - let encoded = to_vec(&[vec![root]], &PayloadMetadata::default()); + let encoded = to_vec(&[vec![root]], &TracerMetadata::default()); assert!(!encoded.is_empty()); } @@ -576,7 +565,7 @@ mod tests { ..Default::default() }; - let encoded = to_vec(&[vec![span]], &PayloadMetadata::default()); + let encoded = to_vec(&[vec![span]], &TracerMetadata::default()); let prod_bytes = b"prod"; assert!( encoded.windows(prod_bytes.len()).any(|w| w == prod_bytes), @@ -614,7 +603,7 @@ mod tests { ..Default::default() }; - let encoded = to_vec(&[vec![span]], &PayloadMetadata::default()); + let encoded = to_vec(&[vec![span]], &TracerMetadata::default()); // Both attribute strings must appear in the payload bytes. let ssi_bytes = b"ssi"; @@ -644,7 +633,7 @@ mod tests { fn test_payload_attributes_absent_when_no_relevant_tags() { // A span with no _dd.apm_mode or _dd.git.commit.sha must not produce key 10. let span = make_span("svc", "op", 1, 1, 0); - let encoded = to_vec(&[vec![span]], &PayloadMetadata::default()); + let encoded = to_vec(&[vec![span]], &TracerMetadata::default()); let apm_key = b"_dd.apm_mode"; assert!( !encoded.windows(apm_key.len()).any(|w| w == apm_key), @@ -655,11 +644,12 @@ mod tests { #[test] fn test_payload_metadata_fields_present() { let span = make_span("svc", "op", 1, 1, 0); - let metadata = PayloadMetadata { - language_name: "python", - language_version: "3.11", - tracer_version: "2.0.0", - runtime_id: "abc-123-uuid", + let metadata = TracerMetadata { + language: "python".to_string(), + language_version: "3.11".to_string(), + tracer_version: "2.0.0".to_string(), + runtime_id: "abc-123-uuid".to_string(), + ..Default::default() }; let encoded = to_vec(&[vec![span]], &metadata); @@ -677,12 +667,12 @@ mod tests { let span = make_span("svc", "op", 1, 1, 0); let encoded_with = to_vec( &[vec![span.clone()]], - &PayloadMetadata { - language_name: "go", + &TracerMetadata { + language: "go".to_string(), ..Default::default() }, ); - let encoded_without = to_vec(&[vec![span]], &PayloadMetadata::default()); + let encoded_without = to_vec(&[vec![span]], &TracerMetadata::default()); // Payload with metadata must be larger (it carries extra fields). assert!(encoded_with.len() > encoded_without.len()); } diff --git a/libdd-trace-utils/src/send_data/mod.rs b/libdd-trace-utils/src/send_data/mod.rs index 81300184c2..d358406ea0 100644 --- a/libdd-trace-utils/src/send_data/mod.rs +++ b/libdd-trace-utils/src/send_data/mod.rs @@ -475,7 +475,6 @@ mod tests { lang_vendor: "vendor", tracer_version: "1.0", container_id: "id", - runtime_id: "", client_computed_top_level: false, client_computed_stats: false, dropped_p0_traces: 0, diff --git a/libdd-trace-utils/src/tracer_header_tags.rs b/libdd-trace-utils/src/tracer_header_tags.rs index 636e72a883..b4a2cda896 100644 --- a/libdd-trace-utils/src/tracer_header_tags.rs +++ b/libdd-trace-utils/src/tracer_header_tags.rs @@ -27,7 +27,6 @@ pub struct TracerHeaderTags<'a> { pub lang_vendor: &'a str, pub tracer_version: &'a str, pub container_id: &'a str, - pub runtime_id: &'a str, // specifies that the client has marked top-level spans, when set. If the header is present // this value will resolve to 'true' pub client_computed_top_level: bool, @@ -168,7 +167,6 @@ mod tests { lang_vendor: "vendor", tracer_version: "1.0", container_id: "id", - runtime_id: "", client_computed_top_level: true, client_computed_stats: true, dropped_p0_traces: 12, @@ -205,7 +203,6 @@ mod tests { lang_vendor: "vendor", tracer_version: "1.0", container_id: "", - runtime_id: "", client_computed_top_level: false, client_computed_stats: false, dropped_p0_spans: 0, diff --git a/libdd-trace-utils/src/tracer_metadata.rs b/libdd-trace-utils/src/tracer_metadata.rs new file mode 100644 index 0000000000..7d67ee953d --- /dev/null +++ b/libdd-trace-utils/src/tracer_metadata.rs @@ -0,0 +1,44 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::tracer_header_tags::TracerHeaderTags; +use http::HeaderMap; + +#[derive(Clone, Default, Debug)] +pub struct TracerMetadata { + pub hostname: String, + pub env: String, + pub app_version: String, + pub runtime_id: String, + pub service: String, + pub tracer_version: String, + pub language: String, + pub language_version: String, + pub language_interpreter: String, + pub language_interpreter_vendor: String, + pub git_commit_sha: String, + pub process_tags: String, + pub client_computed_stats: bool, + pub client_computed_top_level: bool, +} + +impl<'a> From<&'a TracerMetadata> for TracerHeaderTags<'a> { + fn from(tags: &'a TracerMetadata) -> TracerHeaderTags<'a> { + TracerHeaderTags::<'_> { + lang: &tags.language, + lang_version: &tags.language_version, + tracer_version: &tags.tracer_version, + lang_interpreter: &tags.language_interpreter, + lang_vendor: &tags.language_interpreter_vendor, + client_computed_stats: tags.client_computed_stats, + client_computed_top_level: tags.client_computed_top_level, + ..Default::default() + } + } +} + +impl<'a> From<&'a TracerMetadata> for HeaderMap { + fn from(tags: &'a TracerMetadata) -> HeaderMap { + TracerHeaderTags::from(tags).into() + } +} From 2e9717ab826c68596527c0b96a06fb16cb4fba54 Mon Sep 17 00:00:00 2001 From: Anais Raison Date: Mon, 18 May 2026 15:05:23 +0200 Subject: [PATCH 09/13] fix: update dd apm test agent version --- libdd-trace-utils/src/test_utils/datadog_test_agent.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libdd-trace-utils/src/test_utils/datadog_test_agent.rs b/libdd-trace-utils/src/test_utils/datadog_test_agent.rs index e7236985ca..db74e6ed79 100644 --- a/libdd-trace-utils/src/test_utils/datadog_test_agent.rs +++ b/libdd-trace-utils/src/test_utils/datadog_test_agent.rs @@ -14,7 +14,7 @@ use std::str::FromStr; use std::time::Duration; const TEST_AGENT_IMAGE_NAME: &str = "ghcr.io/datadog/dd-apm-test-agent/ddapm-test-agent"; -const TEST_AGENT_IMAGE_TAG: &str = "v1.52.0"; +const TEST_AGENT_IMAGE_TAG: &str = "v1.54.2"; const TEST_AGENT_READY_MSG: &str = "INFO:ddapm_test_agent.agent:Trace request stall seconds setting set to 0.0."; From b1dbf1b6403320f597343e0fd3cdbe86af12d3bf Mon Sep 17 00:00:00 2001 From: Anais Raison Date: Mon, 18 May 2026 15:58:07 +0200 Subject: [PATCH 10/13] fix: rollback test agent version --- libdd-trace-utils/src/test_utils/datadog_test_agent.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libdd-trace-utils/src/test_utils/datadog_test_agent.rs b/libdd-trace-utils/src/test_utils/datadog_test_agent.rs index db74e6ed79..279aca62c9 100644 --- a/libdd-trace-utils/src/test_utils/datadog_test_agent.rs +++ b/libdd-trace-utils/src/test_utils/datadog_test_agent.rs @@ -14,7 +14,7 @@ use std::str::FromStr; use std::time::Duration; const TEST_AGENT_IMAGE_NAME: &str = "ghcr.io/datadog/dd-apm-test-agent/ddapm-test-agent"; -const TEST_AGENT_IMAGE_TAG: &str = "v1.54.2"; +const TEST_AGENT_IMAGE_TAG: &str = "v1.39.0"; const TEST_AGENT_READY_MSG: &str = "INFO:ddapm_test_agent.agent:Trace request stall seconds setting set to 0.0."; From b7e778b25c9235a7459ea60fe98880da6a01ee53 Mon Sep 17 00:00:00 2001 From: Anais Raison Date: Tue, 19 May 2026 10:49:31 +0200 Subject: [PATCH 11/13] fix: rollback test agent changes --- libdd-trace-utils/src/test_utils/datadog_test_agent.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/libdd-trace-utils/src/test_utils/datadog_test_agent.rs b/libdd-trace-utils/src/test_utils/datadog_test_agent.rs index 279aca62c9..dd8351493e 100644 --- a/libdd-trace-utils/src/test_utils/datadog_test_agent.rs +++ b/libdd-trace-utils/src/test_utils/datadog_test_agent.rs @@ -167,12 +167,6 @@ impl DatadogAgentContainerBuilder { )); } - // The image default is SNAPSHOT_CI=1 (CI mode). Override with the host's CI setting so - // local runs (CI unset → "0") can auto-generate missing snapshot files while CI pipelines - // (CI=1) validate against committed ones. - let snapshot_ci = std::env::var("CI").unwrap_or_else(|_| "0".to_string()); - env_vars.push(("SNAPSHOT_CI".to_string(), snapshot_ci)); - DatadogAgentContainerBuilder { mounts, env_vars, From 81e709f555d3179d388f4836b61c2daf8a0ba05b Mon Sep 17 00:00:00 2001 From: Anais Raison Date: Tue, 19 May 2026 16:49:40 +0200 Subject: [PATCH 12/13] fix: address comments --- ...xporter_v04_to_v1_trace_snapshot_test.json | 3 +- .../src/msgpack_encoder/v1/mod.rs | 195 +++++++++++++++++- .../src/msgpack_encoder/v1/span_v04.rs | 76 ++++--- 3 files changed, 232 insertions(+), 42 deletions(-) diff --git a/libdd-data-pipeline/tests/snapshots/compare_exporter_v04_to_v1_trace_snapshot_test.json b/libdd-data-pipeline/tests/snapshots/compare_exporter_v04_to_v1_trace_snapshot_test.json index ae73ca83b3..039057d503 100644 --- a/libdd-data-pipeline/tests/snapshots/compare_exporter_v04_to_v1_trace_snapshot_test.json +++ b/libdd-data-pipeline/tests/snapshots/compare_exporter_v04_to_v1_trace_snapshot_test.json @@ -39,7 +39,8 @@ "_dd.p.tid": "0x0", "env": "test-env", "runtime-id": "test-runtime-id-value", - "service": "test-service" + "service": "test-service", + "span.kind": "internal" }, "metrics": { "_dd_metric1": 1.0, diff --git a/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs b/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs index 8f7bb783f8..86cef2e027 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs @@ -90,15 +90,21 @@ struct PayloadAttrs<'a> { fn extract_payload_attrs<'a, T: TraceData + 'a, S: AsRef<[Span]>>( traces: &'a [S], + metadata: &'a TracerMetadata, ) -> PayloadAttrs<'a> where T::Text: 'a, { - let mut env = None; - let mut hostname = None; - let mut app_version = None; + // Prefer TracerMetadata (set once on the builder) over span scanning. Fall back to + // span meta only when the builder-level value is missing — e.g. v04 payloads where + // the SDK propagated these as span tags. + let mut env = (!metadata.env.is_empty()).then_some(metadata.env.as_str()); + let mut hostname = (!metadata.hostname.is_empty()).then_some(metadata.hostname.as_str()); + let mut app_version = + (!metadata.app_version.is_empty()).then_some(metadata.app_version.as_str()); + let mut git_commit_sha = + (!metadata.git_commit_sha.is_empty()).then_some(metadata.git_commit_sha.as_str()); let mut apm_mode = None; - let mut git_commit_sha = None; 'outer: for trace in traces { for span in trace.as_ref() { @@ -153,14 +159,25 @@ fn extract_chunk_attrs<'a, T: TraceData>(spans: &'a [Span]) -> ChunkAttrs<'a> where T::Text: 'a, { - let mut trace_id = 0u128; + // trace_id is invariant per chunk. The v04 wire format carries only the low 64 bits; + // the high 64 bits are propagated as the hex string meta tag "_dd.p.tid". + let trace_id = spans + .first() + .map(|s| { + let high = s + .meta + .get("_dd.p.tid") + .and_then(|v| u64::from_str_radix(v.borrow(), 16).ok()) + .unwrap_or(0); + ((high as u128) << 64) | s.trace_id + }) + .unwrap_or(0); + let mut sampling_priority = None; let mut origin = None; let mut sampling_mechanism = None; for span in spans { - trace_id = span.trace_id; - // Root span: either no parent in this chunk, or tagged _dd.top_level=1 (remote parent). let is_root = span.parent_id == 0 || span.metrics.get("_dd.top_level").copied().unwrap_or(0.0) == 1.0; @@ -223,7 +240,7 @@ fn encode_payload]>>( metadata: &TracerMetadata, ) -> Result<(), ValueWriteError> { let mut table = StringTable::new(); - let payload_attrs = extract_payload_attrs(traces); + let payload_attrs = extract_payload_attrs(traces, metadata); let attr_count = payload_attrs.apm_mode.is_some() as u32 + payload_attrs.git_commit_sha.is_some() as u32; @@ -676,4 +693,166 @@ mod tests { // Payload with metadata must be larger (it carries extra fields). assert!(encoded_with.len() > encoded_without.len()); } + + #[test] + fn test_128bit_trace_id_from_dd_p_tid() { + let mut meta = HashMap::new(); + meta.insert( + BytesString::from_static("_dd.p.tid"), + BytesString::from_static("640cfd5400000000"), + ); + let span = SpanBytes { + service: BytesString::from_slice(b"svc").unwrap(), + name: BytesString::from_slice(b"op").unwrap(), + resource: BytesString::from_slice(b"res").unwrap(), + trace_id: 0x0123456789abcdef, + span_id: 1, + parent_id: 0, + start: 1000, + duration: 100, + meta, + ..Default::default() + }; + let encoded = to_vec(&[vec![span]], &TracerMetadata::default()); + + // Expected 16-byte BE: high = 0x640cfd5400000000, low = 0x0123456789abcdef + let expected = [ + 0x64, 0x0c, 0xfd, 0x54, 0x00, 0x00, 0x00, 0x00, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, + 0xcd, 0xef, + ]; + assert!( + encoded.windows(16).any(|w| w == expected), + "128-bit trace_id big-endian bytes should appear in payload" + ); + // _dd.p.tid must not also leak into span attributes. + let tid_key = b"_dd.p.tid"; + assert!( + !encoded.windows(tid_key.len()).any(|w| w == tid_key), + "_dd.p.tid should be consumed, not encoded as a span attribute" + ); + } + + #[test] + fn test_128bit_trace_id_without_dd_p_tid() { + // Absent _dd.p.tid → high 64 bits zero. + let span = make_span("svc", "op", 0x0123456789abcdef, 1, 0); + let encoded = to_vec(&[vec![span]], &TracerMetadata::default()); + let expected = [ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, + 0xcd, 0xef, + ]; + assert!( + encoded.windows(16).any(|w| w == expected), + "absent _dd.p.tid should yield zero high 64 bits" + ); + } + + #[test] + fn test_sampling_mechanism_negative_value() { + // `_dd.p.dm` is a signed integer stored as a string (e.g. "-4" → manual rule). + // The encoder must parse it, take unsigned_abs, and emit it at chunk level. + let mut meta = HashMap::new(); + meta.insert( + BytesString::from_static("_dd.p.dm"), + BytesString::from_static("-4"), + ); + let root = SpanBytes { + service: BytesString::from_slice(b"svc").unwrap(), + name: BytesString::from_slice(b"op").unwrap(), + resource: BytesString::from_slice(b"res").unwrap(), + trace_id: 1, + span_id: 1, + parent_id: 0, + start: 1000, + duration: 100, + meta, + ..Default::default() + }; + let encoded = to_vec(&[vec![root]], &TracerMetadata::default()); + + // The chunk-level sampling_mechanism (key 7) must be encoded as uint 4. + // The byte sequence is `chunk_key::SAMPLING_MECHANISM (0x07)` followed by the + // msgpack representation of 4 (positive fixint 0x04). + let expected = [chunk_key::SAMPLING_MECHANISM, 0x04]; + assert!( + encoded.windows(2).any(|w| w == expected), + "sampling_mechanism should be encoded as unsigned_abs(\"-4\") = 4" + ); + } + + #[test] + fn test_chunk_attrs_fallback_no_root_span() { + // Partial flush: no root span (every span has a non-zero parent and no + // `_dd.top_level`). Values must be accumulated from non-root spans. + let mut meta1 = HashMap::new(); + meta1.insert( + BytesString::from_static("_dd.origin"), + BytesString::from_static("lambda"), + ); + let mut metrics2 = HashMap::new(); + metrics2.insert(BytesString::from_static("_sampling_priority_v1"), 2.0f64); + let mut meta3 = HashMap::new(); + meta3.insert( + BytesString::from_static("_dd.p.dm"), + BytesString::from_static("-3"), + ); + + let s1 = SpanBytes { + service: BytesString::from_slice(b"svc").unwrap(), + name: BytesString::from_slice(b"op1").unwrap(), + resource: BytesString::from_slice(b"res").unwrap(), + trace_id: 1, + span_id: 11, + parent_id: 10, // non-zero parent → not a root + start: 1000, + duration: 100, + meta: meta1, + ..Default::default() + }; + let s2 = SpanBytes { + service: BytesString::from_slice(b"svc").unwrap(), + name: BytesString::from_slice(b"op2").unwrap(), + resource: BytesString::from_slice(b"res").unwrap(), + trace_id: 1, + span_id: 12, + parent_id: 11, + start: 1000, + duration: 100, + metrics: metrics2, + ..Default::default() + }; + let s3 = SpanBytes { + service: BytesString::from_slice(b"svc").unwrap(), + name: BytesString::from_slice(b"op3").unwrap(), + resource: BytesString::from_slice(b"res").unwrap(), + trace_id: 1, + span_id: 13, + parent_id: 12, + start: 1000, + duration: 100, + meta: meta3, + ..Default::default() + }; + let encoded = to_vec(&[vec![s1, s2, s3]], &TracerMetadata::default()); + + // Each attribute must be present at chunk level — collected from a different + // non-root span. + let lambda = b"lambda"; + assert!( + encoded.windows(lambda.len()).any(|w| w == lambda), + "origin 'lambda' from span 1 should appear in payload" + ); + // priority 2 → msgpack positive fixint 0x02 preceded by PRIORITY key + let prio = [chunk_key::PRIORITY, 0x02]; + assert!( + encoded.windows(2).any(|w| w == prio), + "sampling_priority 2 from span 2 should appear" + ); + // sampling_mechanism = unsigned_abs("-3") = 3 → 0x03 preceded by SAMPLING_MECHANISM key + let mech = [chunk_key::SAMPLING_MECHANISM, 0x03]; + assert!( + encoded.windows(2).any(|w| w == mech), + "sampling_mechanism 3 from span 3 should appear" + ); + } } diff --git a/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs b/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs index 9c60f573da..d50916ca05 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs @@ -66,14 +66,17 @@ pub(super) enum AnyValueKey { } /// Maps the `span.kind` string tag (from v0.4 meta) to the OTEL SpanKind uint32. -fn span_kind_from_str(s: &str) -> Option { +/// +/// Per the OTEL spec, missing or unrecognized values default to `Internal` (1) — this +/// matches the agent's behavior in `pkg/trace/api/converter.go`. +fn span_kind_from_str(s: &str) -> u32 { match s { - "internal" => Some(1), - "server" => Some(2), - "client" => Some(3), - "producer" => Some(4), - "consumer" => Some(5), - _ => None, + "server" => 2, + "client" => 3, + "producer" => 4, + "consumer" => 5, + // "internal" and any other string fall through to Internal. + _ => 1, } } @@ -238,38 +241,43 @@ pub fn encode_span( span: &Span, table: &mut StringTable, ) -> Result<(), ValueWriteError> { - // Extract promoted fields from meta — these get dedicated span-level keys and must - // not appear in the attributes array. - let env = span.meta.get("env").map(|v| v.borrow()); - let version = span.meta.get("version").map(|v| v.borrow()); - let component = span.meta.get("component").map(|v| v.borrow()); - let kind = span - .meta - .get("span.kind") - .and_then(|v| span_kind_from_str(v.borrow())); - - let is_promoted = - |k: &T::Text| matches!(k.borrow(), "env" | "version" | "component" | "span.kind"); + let is_parent = span.parent_id != 0; + let has_duration = span.duration != 0; + let has_error = span.error != 0; + // Extract promoted fields from meta — these get dedicated span-level keys and must + // not appear in the attributes array. `_dd.p.tid` is consumed to reconstruct the + // 128-bit chunk-level trace_id and is dropped here so it doesn't appear twice. + let is_promoted = |k: &T::Text| { + matches!( + k.borrow(), + "env" | "version" | "component" | "span.kind" | "_dd.p.tid" + ) + }; let non_promoted_meta = span.meta.iter().filter(|(k, _)| !is_promoted(k)).count() as u32; let attr_count = non_promoted_meta + span.metrics.len() as u32 + span.meta_struct.len() as u32; let has_attributes = attr_count > 0; - let span_len = 2 // span_id, start — always present + let env = span.meta.get("env").map(|v| v.borrow()); + let version = span.meta.get("version").map(|v| v.borrow()); + let component = span.meta.get("component").map(|v| v.borrow()); + // span.kind is always emitted — defaults to Internal per OTEL spec. + let kind = span_kind_from_str(span.meta.get("span.kind").map(|v| v.borrow()).unwrap_or("")); + + let span_len = 3 // span_id, start, kind — always present + (!span.service.borrow().is_empty()) as u32 + (!span.name.borrow().is_empty()) as u32 + (!span.resource.borrow().is_empty()) as u32 + (!span.r#type.borrow().is_empty()) as u32 - + (span.parent_id != 0) as u32 - + (span.duration != 0) as u32 - + (span.error != 0) as u32 + + is_parent as u32 + + has_duration as u32 + + has_error as u32 + has_attributes as u32 + (!span.span_links.is_empty()) as u32 + (!span.span_events.is_empty()) as u32 + env.is_some() as u32 + version.is_some() as u32 - + component.is_some() as u32 - + kind.is_some() as u32; + + component.is_some() as u32; rmp::encode::write_map_len(writer, span_len)?; @@ -291,22 +299,26 @@ pub fn encode_span( write_uint8(writer, SpanKey::SpanId as u8)?; write_u64(writer, span.span_id)?; + // `start` and `duration` are stored as i64 but the V1 spec encodes them as u64. A + // negative tracer-side value would wrap to a large unsigned integer; tracers must + // never emit negatives and the agent does the same `uint64(...)` cast in + // `pkg/trace/api/converter.go`. write_uint8(writer, SpanKey::Start as u8)?; write_u64(writer, span.start as u64)?; - if span.parent_id != 0 { + if is_parent { write_uint8(writer, SpanKey::ParentId as u8)?; write_u64(writer, span.parent_id)?; } - if span.duration != 0 { + if has_duration { write_uint8(writer, SpanKey::Duration as u8)?; write_u64(writer, span.duration as u64)?; } - if span.error != 0 { + if has_error { write_uint8(writer, SpanKey::Error as u8)?; - write_bool(writer, span.error != 0).map_err(ValueWriteError::InvalidDataWrite)?; + write_bool(writer, has_error).map_err(ValueWriteError::InvalidDataWrite)?; } if !span.r#type.borrow().is_empty() { @@ -363,10 +375,8 @@ pub fn encode_span( write_uint8(writer, SpanKey::Component as u8)?; table.write_interned(writer, v)?; } - if let Some(k) = kind { - write_uint8(writer, SpanKey::Kind as u8)?; - write_uint(writer, k as u64)?; - } + write_uint8(writer, SpanKey::Kind as u8)?; + write_uint(writer, kind as u64)?; Ok(()) } From 28211e922125682c6718e6e2040686b9a2d4af64 Mon Sep 17 00:00:00 2001 From: Anais Raison Date: Tue, 19 May 2026 18:10:37 +0200 Subject: [PATCH 13/13] fix: fallback when negative number start and duration --- .../src/msgpack_encoder/v1/span_v04.rs | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs b/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs index d50916ca05..2c5962f8f1 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs @@ -8,6 +8,7 @@ use rmp::encode::{ ValueWriteError, }; use std::borrow::Borrow; +use std::time; use super::StringTable; @@ -299,12 +300,19 @@ pub fn encode_span( write_uint8(writer, SpanKey::SpanId as u8)?; write_u64(writer, span.span_id)?; - // `start` and `duration` are stored as i64 but the V1 spec encodes them as u64. A - // negative tracer-side value would wrap to a large unsigned integer; tracers must - // never emit negatives and the agent does the same `uint64(...)` cast in - // `pkg/trace/api/converter.go`. write_uint8(writer, SpanKey::Start as u8)?; - write_u64(writer, span.start as u64)?; + if span.start < 0 { + // Fall back to wall-clock now (UNIX nanos). Matches the agent's + // `validateAndFixStartTime` which substitutes `time.Now().UnixNano()` + // for invalid start values. + let now = time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .map(|d| d.as_nanos() as u64) + .unwrap_or(0); + write_u64(writer, now)?; + } else { + write_u64(writer, span.start as u64)?; + } if is_parent { write_uint8(writer, SpanKey::ParentId as u8)?; @@ -313,7 +321,11 @@ pub fn encode_span( if has_duration { write_uint8(writer, SpanKey::Duration as u8)?; - write_u64(writer, span.duration as u64)?; + if span.duration < 0 { + write_u64(writer, 0)?; + } else { + write_u64(writer, span.duration as u64)?; + } } if has_error {