Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions libdd-data-pipeline/src/agent_info/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ async fn fetch_and_hash_response<H: HttpClientTrait>(
/// );
/// // Start the fetcher on a shared runtime
/// let runtime = libdd_shared_runtime::SharedRuntime::new()?;
/// runtime.spawn_worker(fetcher)?;
/// runtime.spawn_worker(fetcher, true)?;
///
/// // Get the Arc to access the info
/// let agent_info_arc = agent_info::get_agent_info();
Expand Down Expand Up @@ -573,7 +573,7 @@ mod single_threaded_tests {
);
assert!(agent_info::get_agent_info().is_none());
let shared_runtime = SharedRuntime::new().unwrap();
shared_runtime.spawn_worker(fetcher).unwrap();
shared_runtime.spawn_worker(fetcher, true).unwrap();

// Wait until the info is fetched
let start = std::time::Instant::now();
Expand Down Expand Up @@ -656,7 +656,7 @@ mod single_threaded_tests {
AgentInfoFetcher::<NativeCapabilities>::new(endpoint, Duration::from_secs(3600));

let shared_runtime = SharedRuntime::new().unwrap();
shared_runtime.spawn_worker(fetcher).unwrap();
shared_runtime.spawn_worker(fetcher, true).unwrap();

// Create a mock HTTP response with the new agent state
let response = http::Response::builder()
Expand Down Expand Up @@ -737,7 +737,7 @@ mod single_threaded_tests {
AgentInfoFetcher::<NativeCapabilities>::new(endpoint, Duration::from_secs(3600)); // Very long interval

let shared_runtime = SharedRuntime::new().unwrap();
shared_runtime.spawn_worker(fetcher).unwrap();
shared_runtime.spawn_worker(fetcher, true).unwrap();

// Create a mock HTTP response with the same agent state
let response = http::Response::builder()
Expand Down
2 changes: 1 addition & 1 deletion libdd-data-pipeline/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ mod tests {
.set_debug_enabled(true)
.build();
let handle = runtime
.spawn_worker(worker)
.spawn_worker(worker, true)
.expect("Failed to spawn worker");
(client, handle)
}
Expand Down
13 changes: 9 additions & 4 deletions libdd-data-pipeline/src/trace_exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,14 @@ impl TraceExporterBuilder {
let info_endpoint = Endpoint::from_url(add_path(&agent_url, INFO_ENDPOINT));
let (info_fetcher, info_response_observer) =
AgentInfoFetcher::<H>::new(info_endpoint.clone(), Duration::from_secs(5 * 60));
let info_fetcher_handle = shared_runtime.spawn_worker(info_fetcher).map_err(|e| {
TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(e.to_string()))
})?;
let info_fetcher_handle =
shared_runtime
.spawn_worker(info_fetcher, false)
.map_err(|e| {
TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(
e.to_string(),
))
})?;

if let Some(bucket_size) = self.stats_bucket_size {
stats = StatsComputationStatus::DisabledByAgent { bucket_size };
Expand All @@ -330,7 +335,7 @@ impl TraceExporterBuilder {
});
match telemetry {
Some(Ok((client, worker))) => {
let handle = shared_runtime.spawn_worker(worker).map_err(|e| {
let handle = shared_runtime.spawn_worker(worker, false).map_err(|e| {
TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(
e.to_string(),
))
Expand Down
2 changes: 1 addition & 1 deletion libdd-data-pipeline/src/trace_exporter/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ fn create_and_start_stats_worker<H: HttpClientTrait + MaybeSend + Sync + 'static
);
let worker_handle = ctx
.shared_runtime
.spawn_worker(stats_exporter)
.spawn_worker(stats_exporter, false)
.map_err(|e| anyhow::anyhow!(e))?;

// Update the stats computation state with the new worker components.
Expand Down
47 changes: 41 additions & 6 deletions libdd-shared-runtime/src/shared_runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type BoxedWorker = Box<dyn Worker + Sync>;
#[derive(Debug)]
struct WorkerEntry {
id: u64,
restart_on_fork: bool,
worker: PausableWorker<BoxedWorker>,
}

Expand Down Expand Up @@ -191,12 +192,15 @@ impl SharedRuntime {
///
/// The worker will be tracked by this SharedRuntime and will be paused/resumed
/// during fork operations.
/// If `restart_on_fork` is true, the worker will be reset and restarted when calling
/// `after_fork_child` else the worker is dropped *without* calling `Worker::shutdown`.
///
/// # Errors
/// Returns an error if the runtime is not available or the worker cannot be started.
pub fn spawn_worker<T: Worker + Sync + 'static>(
&self,
worker: T,
restart_on_fork: bool,
) -> Result<WorkerHandle, SharedRuntimeError> {
let boxed_worker: BoxedWorker = Box::new(worker);
debug!(?boxed_worker, "Spawning worker on SharedRuntime");
Expand All @@ -220,6 +224,7 @@ impl SharedRuntime {

workers_guard.push(WorkerEntry {
id: worker_id,
restart_on_fork,
worker: pausable_worker,
});

Expand Down Expand Up @@ -316,7 +321,9 @@ impl SharedRuntime {

let mut workers_lock = self.workers.lock_or_panic();

// Restart all workers in child process
// Drop workers not marked as restart on fork
workers_lock.retain(|entry| entry.restart_on_fork);

for worker_entry in workers_lock.iter_mut() {
worker_entry.worker.reset();
worker_entry.worker.start(&runtime)?;
Expand Down Expand Up @@ -445,7 +452,7 @@ mod tests {
let shared_runtime = SharedRuntime::new().unwrap();
let (worker, receiver) = make_test_worker();

let result = shared_runtime.spawn_worker(worker);
let result = shared_runtime.spawn_worker(worker, true);
assert!(result.is_ok());
assert_eq!(shared_runtime.workers.lock_or_panic().len(), 1);

Expand All @@ -464,7 +471,7 @@ mod tests {
let shared_runtime = SharedRuntime::new().unwrap();
let (worker, receiver) = make_test_worker();

let handle = shared_runtime.spawn_worker(worker).unwrap();
let handle = shared_runtime.spawn_worker(worker, true).unwrap();
assert_eq!(shared_runtime.workers.lock_or_panic().len(), 1);

// Wait for at least one run before stopping
Expand Down Expand Up @@ -493,7 +500,7 @@ mod tests {
let shared_runtime = SharedRuntime::new().unwrap();
let (worker, receiver) = make_test_worker();

shared_runtime.spawn_worker(worker).unwrap();
shared_runtime.spawn_worker(worker, true).unwrap();

// Let the worker run until state > 0 so that preservation is observable
let mut state_before_fork = 0;
Expand Down Expand Up @@ -524,7 +531,7 @@ mod tests {
let shared_runtime = SharedRuntime::new().unwrap();
let (worker, receiver) = make_test_worker();

shared_runtime.spawn_worker(worker).unwrap();
shared_runtime.spawn_worker(worker, true).unwrap();

// Let the worker run until state > 0 so that the reset is observable
let mut state_before_fork = 0;
Expand Down Expand Up @@ -555,7 +562,7 @@ mod tests {
let shared_runtime = SharedRuntime::new().unwrap();
let (worker, receiver) = make_test_worker();

shared_runtime.spawn_worker(worker).unwrap();
shared_runtime.spawn_worker(worker, true).unwrap();

// Wait for at least one run before shutting down
receiver
Expand All @@ -573,4 +580,32 @@ mod tests {
}
assert_eq!(last, -1);
}

#[test]
fn test_after_fork_child_drops_worker_not_restart_on_fork() {
let shared_runtime = SharedRuntime::new().unwrap();
let (worker, receiver) = make_test_worker();

shared_runtime.spawn_worker(worker, false).unwrap();

// Wait for the worker to run at least once
receiver
.recv_timeout(Duration::from_secs(1))
.expect("worker did not run");

shared_runtime.before_fork();
// Drain buffered messages now that the worker is paused
while receiver.try_recv().is_ok() {}

assert!(shared_runtime.after_fork_child().is_ok());

// Worker must be removed from the list
assert_eq!(shared_runtime.workers.lock_or_panic().len(), 0);

// Worker must not produce any more messages (not restarted, not shut down)
assert!(
receiver.recv_timeout(Duration::from_millis(200)).is_err(),
"worker should not run or shut down after fork in child when restart_on_fork is false"
);
}
}
4 changes: 2 additions & 2 deletions libdd-trace-stats/src/stats_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ mod tests {
);

let _handle = shared_runtime
.spawn_worker(stats_exporter)
.spawn_worker(stats_exporter, true)
.expect("Failed to spawn worker");

// Wait for stats to be flushed
Expand Down Expand Up @@ -409,7 +409,7 @@ mod tests {
);

let _handle = shared_runtime
.spawn_worker(stats_exporter)
.spawn_worker(stats_exporter, true)
.expect("Failed to spawn worker");

shared_runtime.shutdown(None).unwrap();
Expand Down
Loading