Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion libdd-data-pipeline/src/trace_exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,9 @@ impl TraceExporterBuilder {
match input {
TraceExporterInputFormat::V04 => matches!(
output,
TraceExporterOutputFormat::V04 | TraceExporterOutputFormat::V05
TraceExporterOutputFormat::V04
| TraceExporterOutputFormat::V05
| TraceExporterOutputFormat::V1
),
TraceExporterInputFormat::V05 => matches!(output, TraceExporterOutputFormat::V05),
}
Expand Down
42 changes: 4 additions & 38 deletions libdd-data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub enum TraceExporterOutputFormat {
#[default]
V04,
V05,
V1,
Comment thread
anais-raison marked this conversation as resolved.
}

impl TraceExporterOutputFormat {
Expand All @@ -93,6 +94,7 @@ impl TraceExporterOutputFormat {
match self {
TraceExporterOutputFormat::V04 => "/v0.4/traces",
TraceExporterOutputFormat::V05 => "/v0.5/traces",
TraceExporterOutputFormat::V1 => "/v1.0/traces",
Comment thread
anais-raison marked this conversation as resolved.
},
)
}
Expand Down Expand Up @@ -127,44 +129,7 @@ fn add_path(url: &Uri, path: &str) -> Uri {
Uri::from_parts(parts).unwrap()
}

#[derive(Clone, Default, Debug)]
pub struct TracerMetadata {
pub hostname: String,
pub env: String,
pub app_version: String,
pub runtime_id: String,
pub service: String,
pub tracer_version: String,
pub language: String,
pub language_version: String,
pub language_interpreter: String,
pub language_interpreter_vendor: String,
pub git_commit_sha: String,
pub process_tags: String,
pub client_computed_stats: bool,
pub client_computed_top_level: bool,
}

impl<'a> From<&'a TracerMetadata> for TracerHeaderTags<'a> {
fn from(tags: &'a TracerMetadata) -> TracerHeaderTags<'a> {
TracerHeaderTags::<'_> {
lang: &tags.language,
lang_version: &tags.language_version,
tracer_version: &tags.tracer_version,
lang_interpreter: &tags.language_interpreter,
lang_vendor: &tags.language_interpreter_vendor,
client_computed_stats: tags.client_computed_stats,
client_computed_top_level: tags.client_computed_top_level,
..Default::default()
}
}
}

impl<'a> From<&'a TracerMetadata> for HeaderMap {
fn from(tags: &'a TracerMetadata) -> HeaderMap {
TracerHeaderTags::from(tags).into()
}
}
pub use libdd_trace_utils::tracer_metadata::TracerMetadata;

/// Handles for the background workers owned by a [`TraceExporter`].
#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -633,6 +598,7 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra
let prepared = match self.serializer.prepare_traces_payload(
traces,
header_tags,
&self.metadata,
self.agent_payload_response_version.as_ref(),
) {
Ok(p) => p,
Expand Down
20 changes: 0 additions & 20 deletions libdd-data-pipeline/src/trace_exporter/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,26 +339,6 @@ pub(crate) fn is_stats_worker_active(client_side_stats: &ArcSwap<StatsComputatio
)
}

#[cfg(not(target_arch = "wasm32"))]
impl From<TracerMetadata> for StatsMetadata {
fn from(m: TracerMetadata) -> StatsMetadata {
StatsMetadata {
hostname: m.hostname,
env: m.env,
app_version: m.app_version,
runtime_id: m.runtime_id,
language: m.language,
lang_version: m.language_version,
lang_interpreter: m.language_interpreter,
lang_vendor: m.language_interpreter_vendor,
tracer_version: m.tracer_version,
git_commit_sha: m.git_commit_sha,
process_tags: m.process_tags,
service: m.service,
}
}
}

#[cfg(test)]
mod tests {
#[cfg(feature = "stats-obfuscation")]
Expand Down
56 changes: 42 additions & 14 deletions libdd-data-pipeline/src/trace_exporter/trace_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use libdd_trace_utils::msgpack_decoder::decode::error::DecodeError;
use libdd_trace_utils::msgpack_encoder;
use libdd_trace_utils::span::{v04::Span, TraceData};
use libdd_trace_utils::trace_utils::{self, TracerHeaderTags};
use libdd_trace_utils::tracer_metadata::TracerMetadata;
use libdd_trace_utils::tracer_payload;

/// Minimal capacity of fresh buffers allocated to encode traces, in bytes.
Expand Down Expand Up @@ -52,13 +53,14 @@ impl TraceSerializer {
&self,
traces: Vec<Vec<Span<T>>>,
header_tags: TracerHeaderTags,
metadata: &TracerMetadata,
agent_payload_response_version: Option<&AgentResponsePayloadVersion>,
) -> Result<PreparedTracesPayload, TraceExporterError> {
let payload = self.collect_and_process_traces(traces)?;
let chunks = payload.size();
let headers =
self.build_traces_headers(header_tags, chunks, agent_payload_response_version);
let mp_payload = self.serialize_payload(&payload)?;
let mp_payload = self.serialize_payload(&payload, metadata)?;

Ok(PreparedTracesPayload {
data: mp_payload,
Expand All @@ -72,13 +74,15 @@ impl TraceSerializer {
&self,
traces: Vec<Vec<Span<T>>>,
) -> Result<tracer_payload::TraceChunks<T>, TraceExporterError> {
let use_v05_format = match self.output_format {
TraceExporterOutputFormat::V05 => true,
TraceExporterOutputFormat::V04 => false,
};
trace_utils::collect_trace_chunks(traces, use_v05_format).map_err(|e| {
TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string()))
})
match self.output_format {
TraceExporterOutputFormat::V1 => Ok(tracer_payload::TraceChunks::V1(traces)),
format => {
let use_v05_format = matches!(format, TraceExporterOutputFormat::V05);
trace_utils::collect_trace_chunks(traces, use_v05_format).map_err(|e| {
TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string()))
})
}
}
}

/// Build HTTP headers for traces request
Expand All @@ -105,6 +109,7 @@ impl TraceSerializer {
fn serialize_payload<T: TraceData>(
&self,
payload: &tracer_payload::TraceChunks<T>,
metadata: &TracerMetadata,
) -> Result<Vec<u8>, TraceExporterError> {
let capacity = self
.previous_serialised_len
Expand All @@ -120,6 +125,9 @@ impl TraceSerializer {
.map_err(TraceExporterError::Serialization)?;
buff
}
tracer_payload::TraceChunks::V1(p) => {
msgpack_encoder::v1::to_vec_with_capacity(p, capacity as u32, metadata)
}
};
self.previous_serialised_len
.store(buff.len(), Ordering::Relaxed);
Expand Down Expand Up @@ -275,7 +283,7 @@ mod tests {
.collect_and_process_traces(original_traces.clone())
.unwrap();

let result = serializer.serialize_payload(&payload);
let result = serializer.serialize_payload(&payload, &TracerMetadata::default());
assert!(result.is_ok());

let serialized = result.unwrap();
Expand Down Expand Up @@ -310,7 +318,7 @@ mod tests {
.collect_and_process_traces(original_traces.clone())
.unwrap();

let result = serializer.serialize_payload(&payload);
let result = serializer.serialize_payload(&payload, &TracerMetadata::default());
assert!(result.is_ok());

let serialized = result.unwrap();
Expand Down Expand Up @@ -346,7 +354,12 @@ mod tests {
];
let header_tags = create_test_header_tags();

let result = serializer.prepare_traces_payload(traces, header_tags, None);
let result = serializer.prepare_traces_payload(
traces,
header_tags,
&TracerMetadata::default(),
None,
);
assert!(result.is_ok());

let prepared = result.unwrap();
Expand All @@ -365,7 +378,12 @@ mod tests {
let traces = vec![vec![create_test_span()]];
let header_tags = create_test_header_tags();

let result = serializer.prepare_traces_payload(traces, header_tags, None);
let result = serializer.prepare_traces_payload(
traces,
header_tags,
&TracerMetadata::default(),
None,
);
assert!(result.is_ok());

let prepared = result.unwrap();
Expand All @@ -381,7 +399,12 @@ mod tests {
let traces = vec![vec![create_test_span()]];
let header_tags = create_test_header_tags();

let result = serializer.prepare_traces_payload(traces, header_tags, Some(&agent_version));
let result = serializer.prepare_traces_payload(
traces,
header_tags,
&TracerMetadata::default(),
Some(&agent_version),
);
assert!(result.is_ok());

let prepared = result.unwrap();
Expand All @@ -395,7 +418,12 @@ mod tests {
let traces: Vec<Vec<SpanBytes>> = vec![];
let header_tags = create_test_header_tags();

let result = serializer.prepare_traces_payload(traces, header_tags, None);
let result = serializer.prepare_traces_payload(
traces,
header_tags,
&TracerMetadata::default(),
None,
);
assert!(result.is_ok());

let prepared = result.unwrap();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
[[
{
"name": "test_exporter_v04_v1_snapshot_root",
"service": "test-service",
"resource": "test-resource",
"trace_id": 0,
Comment thread
anais-raison marked this conversation as resolved.
"span_id": 1,
"parent_id": 0,
"type": "web",
"meta": {
"_dd.hostname": "my-host",
"_dd.origin": "lambda",
"_dd.p.dm": "-4",
"_dd.p.tid": "0x0",
"component": "http",
"env": "test-env",
"runtime-id": "test-runtime-id-value",
"service": "test-service",
"span.kind": "server",
"version": "1.0.0"
},
"metrics": {
"_dd.top_level": 1.0,
"_sampling_priority_v1": 1
},
"duration": 5,
"start": 0
},
{
"name": "test_exporter_v04_v1_snapshot_child",
"service": "test-service",
"resource": "test-resource",
"trace_id": 0,
"span_id": 2,
"parent_id": 1,
"meta": {
"_dd.origin": "lambda",
"_dd.p.dm": "-4",
"_dd.p.tid": "0x0",
"env": "test-env",
"runtime-id": "test-runtime-id-value",
"service": "test-service",
"span.kind": "internal"
},
"metrics": {
"_dd_metric1": 1.0,
"_dd_metric2": 2.0,
"_sampling_priority_v1": 1
},
"duration": 5,
"start": 1
}]]
74 changes: 74 additions & 0 deletions libdd-data-pipeline/tests/test_trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,80 @@ mod tracing_integration_tests {
test_agent.assert_snapshot(snapshot_name).await;
}

fn get_v04_to_v1_trace_snapshot_test_payload(name_prefix: &str) -> Vec<u8> {
// Root span: exercises chunk-level attrs (sampling priority, origin, mechanism)
// and span-level promoted fields (env, version, component, span.kind).
let mut root_span = create_test_json_span(1234, 12341, 0, 0, true);
root_span["name"] = json!(format!("{name_prefix}_root"));
root_span["type"] = json!("web");
root_span["meta"] = json!({
"env": "test-env",
"version": "1.0.0",
"component": "http",
"span.kind": "server",
"_dd.hostname": "my-host",
"_dd.origin": "lambda",
"_dd.p.dm": "-4",
"runtime-id": "test-runtime-id-value",
"service": "test-service",
});
root_span["metrics"] = json!({
"_sampling_priority_v1": 1.0,
"_dd.top_level": 1.0,
});

// Child span: exercises metrics and meta attributes without promoted fields.
let mut child_span = create_test_json_span(1234, 12342, 12341, 1, false);
child_span["name"] = json!(format!("{name_prefix}_child"));
child_span["metrics"] = json!({
"_dd_metric1": 1.0,
"_dd_metric2": 2.0,
});

rmp_serde::to_vec_named(&vec![vec![root_span, child_span]]).unwrap()
}

#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn compare_v04_to_v1_trace_snapshot_test() {
let relative_snapshot_path = "libdd-data-pipeline/tests/snapshots/";
let snapshot_name = "compare_exporter_v04_to_v1_trace_snapshot_test";
let test_agent = DatadogTestAgent::new(Some(relative_snapshot_path), None, &[]).await;
let url = test_agent.get_base_uri().await;

test_agent.start_session(snapshot_name, None).await;

let task_result = task::spawn_blocking(move || {
let mut builder = TraceExporter::<NativeCapabilities>::builder();
builder
.set_url(url.to_string().as_ref())
.set_language("test-lang")
.set_language_version("2.0")
.set_language_interpreter_vendor("vendor")
.set_language_interpreter("interpreter")
.set_tracer_version("1.0")
.set_env("test_env")
.set_service("test")
.set_test_session_token(snapshot_name)
.set_input_format(TraceExporterInputFormat::V04)
.set_output_format(TraceExporterOutputFormat::V1);

let trace_exporter = builder
.build::<NativeCapabilities>()
.expect("Unable to build TraceExporter");

let data = get_v04_to_v1_trace_snapshot_test_payload("test_exporter_v04_v1_snapshot");

let response = trace_exporter.send(data.as_ref());
assert!(response.is_ok(), "send failed: {:?}", response.err());
})
.await;

assert!(task_result.is_ok());

test_agent.assert_snapshot(snapshot_name).await;
}

#[cfg_attr(miri, ignore)]
#[cfg(target_os = "linux")]
#[tokio::test]
Expand Down
Loading
Loading