diff --git a/libdd-data-pipeline/src/agent_info/fetcher.rs b/libdd-data-pipeline/src/agent_info/fetcher.rs index 865bd5a87c..f49bc9f472 100644 --- a/libdd-data-pipeline/src/agent_info/fetcher.rs +++ b/libdd-data-pipeline/src/agent_info/fetcher.rs @@ -160,7 +160,7 @@ async fn fetch_and_hash_response( /// ); /// // Start the fetcher on a shared runtime /// let runtime = libdd_shared_runtime::SharedRuntime::new()?; -/// runtime.spawn_worker(fetcher)?; +/// runtime.spawn_worker(fetcher, true)?; /// /// // Get the Arc to access the info /// let agent_info_arc = agent_info::get_agent_info(); @@ -573,7 +573,7 @@ mod single_threaded_tests { ); assert!(agent_info::get_agent_info().is_none()); let shared_runtime = SharedRuntime::new().unwrap(); - shared_runtime.spawn_worker(fetcher).unwrap(); + shared_runtime.spawn_worker(fetcher, true).unwrap(); // Wait until the info is fetched let start = std::time::Instant::now(); @@ -656,7 +656,7 @@ mod single_threaded_tests { AgentInfoFetcher::::new(endpoint, Duration::from_secs(3600)); let shared_runtime = SharedRuntime::new().unwrap(); - shared_runtime.spawn_worker(fetcher).unwrap(); + shared_runtime.spawn_worker(fetcher, true).unwrap(); // Create a mock HTTP response with the new agent state let response = http::Response::builder() @@ -737,7 +737,7 @@ mod single_threaded_tests { AgentInfoFetcher::::new(endpoint, Duration::from_secs(3600)); // Very long interval let shared_runtime = SharedRuntime::new().unwrap(); - shared_runtime.spawn_worker(fetcher).unwrap(); + shared_runtime.spawn_worker(fetcher, true).unwrap(); // Create a mock HTTP response with the same agent state let response = http::Response::builder() diff --git a/libdd-data-pipeline/src/telemetry/mod.rs b/libdd-data-pipeline/src/telemetry/mod.rs index 86b7a302a7..99be90603f 100644 --- a/libdd-data-pipeline/src/telemetry/mod.rs +++ b/libdd-data-pipeline/src/telemetry/mod.rs @@ -329,7 +329,7 @@ mod tests { .set_debug_enabled(true) .build(); let handle = runtime - .spawn_worker(worker) + .spawn_worker(worker, true) .expect("Failed to spawn worker"); (client, handle) } diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 29c3d78657..e899b5523b 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -302,9 +302,14 @@ impl TraceExporterBuilder { let info_endpoint = Endpoint::from_url(add_path(&agent_url, INFO_ENDPOINT)); let (info_fetcher, info_response_observer) = AgentInfoFetcher::::new(info_endpoint.clone(), Duration::from_secs(5 * 60)); - let info_fetcher_handle = shared_runtime.spawn_worker(info_fetcher).map_err(|e| { - TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(e.to_string())) - })?; + let info_fetcher_handle = + shared_runtime + .spawn_worker(info_fetcher, false) + .map_err(|e| { + TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration( + e.to_string(), + )) + })?; if let Some(bucket_size) = self.stats_bucket_size { stats = StatsComputationStatus::DisabledByAgent { bucket_size }; @@ -330,7 +335,7 @@ impl TraceExporterBuilder { }); match telemetry { Some(Ok((client, worker))) => { - let handle = shared_runtime.spawn_worker(worker).map_err(|e| { + let handle = shared_runtime.spawn_worker(worker, false).map_err(|e| { TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration( e.to_string(), )) diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 6a15438248..1fbaf82d0d 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -114,7 +114,7 @@ fn create_and_start_stats_worker; #[derive(Debug)] struct WorkerEntry { id: u64, + restart_on_fork: bool, worker: PausableWorker, } @@ -191,12 +192,15 @@ impl SharedRuntime { /// /// The worker will be tracked by this SharedRuntime and will be paused/resumed /// during fork operations. + /// If `restart_on_fork` is true, the worker will be reset and restarted when calling + /// `after_fork_child` else the worker is dropped *without* calling `Worker::shutdown`. /// /// # Errors /// Returns an error if the runtime is not available or the worker cannot be started. pub fn spawn_worker( &self, worker: T, + restart_on_fork: bool, ) -> Result { let boxed_worker: BoxedWorker = Box::new(worker); debug!(?boxed_worker, "Spawning worker on SharedRuntime"); @@ -220,6 +224,7 @@ impl SharedRuntime { workers_guard.push(WorkerEntry { id: worker_id, + restart_on_fork, worker: pausable_worker, }); @@ -316,7 +321,9 @@ impl SharedRuntime { let mut workers_lock = self.workers.lock_or_panic(); - // Restart all workers in child process + // Drop workers not marked as restart on fork + workers_lock.retain(|entry| entry.restart_on_fork); + for worker_entry in workers_lock.iter_mut() { worker_entry.worker.reset(); worker_entry.worker.start(&runtime)?; @@ -445,7 +452,7 @@ mod tests { let shared_runtime = SharedRuntime::new().unwrap(); let (worker, receiver) = make_test_worker(); - let result = shared_runtime.spawn_worker(worker); + let result = shared_runtime.spawn_worker(worker, true); assert!(result.is_ok()); assert_eq!(shared_runtime.workers.lock_or_panic().len(), 1); @@ -464,7 +471,7 @@ mod tests { let shared_runtime = SharedRuntime::new().unwrap(); let (worker, receiver) = make_test_worker(); - let handle = shared_runtime.spawn_worker(worker).unwrap(); + let handle = shared_runtime.spawn_worker(worker, true).unwrap(); assert_eq!(shared_runtime.workers.lock_or_panic().len(), 1); // Wait for at least one run before stopping @@ -493,7 +500,7 @@ mod tests { let shared_runtime = SharedRuntime::new().unwrap(); let (worker, receiver) = make_test_worker(); - shared_runtime.spawn_worker(worker).unwrap(); + shared_runtime.spawn_worker(worker, true).unwrap(); // Let the worker run until state > 0 so that preservation is observable let mut state_before_fork = 0; @@ -524,7 +531,7 @@ mod tests { let shared_runtime = SharedRuntime::new().unwrap(); let (worker, receiver) = make_test_worker(); - shared_runtime.spawn_worker(worker).unwrap(); + shared_runtime.spawn_worker(worker, true).unwrap(); // Let the worker run until state > 0 so that the reset is observable let mut state_before_fork = 0; @@ -555,7 +562,7 @@ mod tests { let shared_runtime = SharedRuntime::new().unwrap(); let (worker, receiver) = make_test_worker(); - shared_runtime.spawn_worker(worker).unwrap(); + shared_runtime.spawn_worker(worker, true).unwrap(); // Wait for at least one run before shutting down receiver @@ -573,4 +580,32 @@ mod tests { } assert_eq!(last, -1); } + + #[test] + fn test_after_fork_child_drops_worker_not_restart_on_fork() { + let shared_runtime = SharedRuntime::new().unwrap(); + let (worker, receiver) = make_test_worker(); + + shared_runtime.spawn_worker(worker, false).unwrap(); + + // Wait for the worker to run at least once + receiver + .recv_timeout(Duration::from_secs(1)) + .expect("worker did not run"); + + shared_runtime.before_fork(); + // Drain buffered messages now that the worker is paused + while receiver.try_recv().is_ok() {} + + assert!(shared_runtime.after_fork_child().is_ok()); + + // Worker must be removed from the list + assert_eq!(shared_runtime.workers.lock_or_panic().len(), 0); + + // Worker must not produce any more messages (not restarted, not shut down) + assert!( + receiver.recv_timeout(Duration::from_millis(200)).is_err(), + "worker should not run or shut down after fork in child when restart_on_fork is false" + ); + } } diff --git a/libdd-trace-stats/src/stats_exporter.rs b/libdd-trace-stats/src/stats_exporter.rs index 8f1396aec1..e0130804cd 100644 --- a/libdd-trace-stats/src/stats_exporter.rs +++ b/libdd-trace-stats/src/stats_exporter.rs @@ -368,7 +368,7 @@ mod tests { ); let _handle = shared_runtime - .spawn_worker(stats_exporter) + .spawn_worker(stats_exporter, true) .expect("Failed to spawn worker"); // Wait for stats to be flushed @@ -409,7 +409,7 @@ mod tests { ); let _handle = shared_runtime - .spawn_worker(stats_exporter) + .spawn_worker(stats_exporter, true) .expect("Failed to spawn worker"); shared_runtime.shutdown(None).unwrap();