diff --git a/datadog-ipc-macros/src/lib.rs b/datadog-ipc-macros/src/lib.rs index e85e30a883..dc3a69bf52 100644 --- a/datadog-ipc-macros/src/lib.rs +++ b/datadog-ipc-macros/src/lib.rs @@ -248,15 +248,39 @@ fn gen_serve_fn( .map(|(attrs, n, _)| quote! { #(#attrs)* #n }) .collect(); + let force_flush = if m.is_blocking { + quote! { true } + } else { + quote! { false } + }; + let response_code = if m.return_type.is_some() { + // Flush any buffered acks before sending the typed response, so the client's + // send_count/ack_count loop processes them before reading the real reply. quote! { + #[cfg(target_os = "linux")] + if __pending_acks > 0 { + datadog_ipc::send_acks_async(&async_fd, __pending_acks).await; + __pending_acks = 0; + } let result = handler.#name(peer, #(#field_names),*).await; let __resp_data = datadog_ipc::codec::encode(&result); datadog_ipc::send_raw_async(&async_fd, &__resp_data).await.ok(); } } else { + // On Linux, buffer up to 20 acks and flush in a single + // sendmmsg(2) syscall; on other platforms send each ack immediately. quote! { handler.#name(peer, #(#field_names),*).await; + #[cfg(target_os = "linux")] + { + __pending_acks += 1; + if #force_flush || __pending_acks >= datadog_ipc::ACK_BUFFER_SIZE { + datadog_ipc::send_acks_async(&async_fd, __pending_acks).await; + __pending_acks = 0; + } + } + #[cfg(not(target_os = "linux"))] // 1-byte ack: distinguishable from EOF (0 bytes from recvmsg on closed socket). datadog_ipc::send_raw_async(&async_fd, &[0u8]).await.ok(); } @@ -283,20 +307,24 @@ fn gen_serve_fn( return; } }; + // Pending 1-byte acks for fire-and-forget methods, flushed via sendmmsg(2) on Linux. + #[cfg(target_os = "linux")] + let mut __pending_acks: u32 = 0; loop { - let (buf, fds) = match datadog_ipc::recv_raw_async(&async_fd).await { - Ok(x) => x, + let (mut req, fds) = match datadog_ipc::recv_raw_async( + &async_fd, + |buf| datadog_ipc::codec::decode::<#enum_name>(buf), + ).await { + Ok((Ok(req), fds)) => (req, fds), + Ok((Err(_), _)) => { + ::tracing::warn!("IPC serve: failed to decode request"); + break; + } Err(e) => { ::tracing::trace!("IPC serve: recv (connection closed?): {e}"); break; } }; - let Ok(mut req) = - datadog_ipc::codec::decode::<#enum_name>(&buf) - else { - ::tracing::warn!("IPC serve: failed to decode request"); - break; - }; let mut __source = datadog_ipc::handles::FdSource::new(fds); if datadog_ipc::handles::TransferHandles::receive_handles( &mut req, diff --git a/datadog-ipc/src/client.rs b/datadog-ipc/src/client.rs index 395c852be3..b617979abd 100644 --- a/datadog-ipc/src/client.rs +++ b/datadog-ipc/src/client.rs @@ -30,8 +30,6 @@ pub struct IpcClientConn { recv_buf: Vec, /// Set to true when a fatal I/O error occurs on send or receive. closed: bool, - /// Skip draining when the caller already drained - drained_acks_since_send: bool, } impl IpcClientConn { @@ -42,7 +40,6 @@ impl IpcClientConn { ack_count: 0, recv_buf: vec![0u8; max_message_size() + HANDLE_SUFFIX_SIZE], closed: false, - drained_acks_since_send: true, } } @@ -64,18 +61,18 @@ impl IpcClientConn { self.send_count - self.ack_count } - /// Non-blocking drain of all pending acks for client side. Updates `ack_count`. + /// Non-blocking drain of pending acks up to the number of outstanding messages. + /// + /// Passing `outstanding()` as the bound avoids initialising more kernel structures than + /// needed and skips the final `WouldBlock` probe when all expected acks have arrived. + /// On Linux uses `recvmmsg` to batch-receive up to 100 acks per syscall. pub fn drain_acks(&mut self) { - self.drained_acks_since_send = true; - loop { - match self.conn.try_recv_raw(&mut self.recv_buf) { - Ok(_) => self.ack_count += 1, - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break, - Err(e) => { - warn!("drain_acks: connection error ({}), marking closed", e); - self.closed = true; - break; - } + let max = self.outstanding() as usize; + match self.conn.drain_acks_nonblocking(max) { + Ok(count) => self.ack_count += count as u64, + Err(e) => { + warn!("drain_acks: connection error ({}), marking closed", e); + self.closed = true; } } } @@ -85,11 +82,6 @@ impl IpcClientConn { /// Returns `false` if the socket would block (EAGAIN). /// `data` is unmodified after the call. pub fn try_send(&mut self, data: &mut Vec, fds: &[RawFd]) -> bool { - if !self.drained_acks_since_send { - self.drain_acks(); - } - self.drained_acks_since_send = false; - match self.conn.try_send_raw(data, fds) { Ok(()) => { self.send_count += 1; @@ -117,15 +109,20 @@ impl IpcClientConn { /// Blocking send + blocking receive of response. /// - /// Sends `data`/`fds` (blocking), then receives in a loop, skipping any - /// intermediate 0-byte acks for prior fire-and-forget messages, until the - /// ack for this specific send arrives. Returns the response bytes and any - /// transferred file descriptors. + /// Drains any pending fire-and-forget acks (non-blocking, batched on Linux via `recvmmsg`) + /// before sending, so the subsequent blocking recv loop only needs to wait for the single + /// response ack. Sends `data`/`fds` (blocking), then receives in a loop until the ack + /// for this specific send arrives. Returns the response bytes and any transferred file + /// descriptors. pub fn call( &mut self, data: &mut Vec, fds: &[RawFd], ) -> io::Result<(Vec, Vec)> { + self.drain_acks(); + if self.closed { + return Err(io::Error::from(io::ErrorKind::BrokenPipe)); + } self.conn.send_raw_blocking(data, fds).inspect_err(|e| { warn!("call: send failed ({}), marking closed", e); self.closed = true; diff --git a/datadog-ipc/src/lib.rs b/datadog-ipc/src/lib.rs index 1ea96791da..144fcb5442 100644 --- a/datadog-ipc/src/lib.rs +++ b/datadog-ipc/src/lib.rs @@ -17,6 +17,12 @@ pub mod client; pub mod codec; pub use client::IpcClientConn; +#[cfg(target_os = "linux")] +pub use platform::send_acks_async; + +/// Maximum number of 1-byte acks buffered per connection before a forced flush. +/// Must match the `MAX_BATCH` limit inside `send_acks_async`. +pub const ACK_BUFFER_SIZE: u32 = 20; pub use platform::{ max_message_size, AsyncConn, PeerCredentials, SeqpacketConn, SeqpacketListener, HANDLE_SUFFIX_SIZE, diff --git a/datadog-ipc/src/platform/unix/sockets/mod.rs b/datadog-ipc/src/platform/unix/sockets/mod.rs index bed3b15baf..722ca3bd1a 100644 --- a/datadog-ipc/src/platform/unix/sockets/mod.rs +++ b/datadog-ipc/src/platform/unix/sockets/mod.rs @@ -9,7 +9,10 @@ use nix::sys::socket::{recvmsg, sendmsg, AddressFamily, SockFlag, SockType}; pub use nix::sys::socket::{ControlMessage, ControlMessageOwned, MsgFlags, UnixAddr}; +#[cfg(target_os = "linux")] +use std::mem::MaybeUninit; use std::{ + cell::RefCell, io, os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, sync::atomic::{AtomicUsize, Ordering}, @@ -286,6 +289,93 @@ impl SeqpacketConn { recvmsg_raw(self.inner.as_raw_fd(), buf, MsgFlags::empty()) } + /// Non-blocking drain of up to `max` available ack messages. Returns the count drained. + /// + /// `max` should be `send_count - ack_count` (the number of outstanding unacknowledged + /// messages); passing a tight bound avoids initialising more kernel structures than needed + /// and lets the loop exit without a final `WouldBlock` syscall. + /// + /// Acks are always 1-byte payloads with no file descriptors, so a minimal per-slot buffer + /// suffices. On Linux, `recvmmsg(2)` batches up to 100 receives into a single syscall; on + /// other platforms individual `try_recv_raw` calls are used instead. + pub fn drain_acks_nonblocking(&self, max: usize) -> io::Result { + if max == 0 { + return Ok(0); + } + #[cfg(target_os = "linux")] + { + const BATCH: usize = 100; + // Only initialise as many slots as we could possibly need. + let batch = max.min(BATCH); + // SAFETY: bufs/iovs/msgs are written before being passed to recvmmsg. + // msg_name and msg_control must be null; all other fields are either set + // explicitly below or are output-only (msg_flags, msg_len). + let mut bufs = [const { MaybeUninit::<[u8; 1]>::uninit() }; BATCH]; + let mut iovs = [const { MaybeUninit::::uninit() }; BATCH]; + let mut msgs = [const { MaybeUninit::::uninit() }; BATCH]; + for i in 0..batch { + unsafe { + (*iovs[i].as_mut_ptr()).iov_base = bufs[i].as_mut_ptr() as *mut libc::c_void; + (*iovs[i].as_mut_ptr()).iov_len = 1; + (*msgs[i].as_mut_ptr()).msg_hdr.msg_name = std::ptr::null_mut(); + (*msgs[i].as_mut_ptr()).msg_hdr.msg_namelen = 0; + // addr_of_mut avoids creating a mutable reference to an array slot. + (*msgs[i].as_mut_ptr()).msg_hdr.msg_iov = iovs[i].as_mut_ptr(); + (*msgs[i].as_mut_ptr()).msg_hdr.msg_iovlen = 1; + (*msgs[i].as_mut_ptr()).msg_hdr.msg_control = std::ptr::null_mut(); + (*msgs[i].as_mut_ptr()).msg_hdr.msg_controllen = 0; + } + } + let fd = self.inner.as_raw_fd(); + let mut total = 0usize; + loop { + let this_batch = (max - total).min(batch) as libc::c_uint; + let n = unsafe { + libc::recvmmsg( + fd, + msgs[0].as_mut_ptr(), + this_batch, + libc::MSG_DONTWAIT as _, // seems to be inconsistent, sometimes i32, u32 + std::ptr::null_mut(), + ) + }; + if n < 0 { + let e = io::Error::last_os_error(); + return if e.kind() == io::ErrorKind::WouldBlock { + Ok(total) + } else { + Err(e) + }; + } + let n = n as usize; + for msg in msgs.iter().take(n) { + if unsafe { msg.assume_init_ref() }.msg_len == 0 { + return Err(io::Error::from(io::ErrorKind::BrokenPipe)); + } + } + total += n; + if n < this_batch as usize || total >= max { + return Ok(total); + } + } + } + #[cfg(not(target_os = "linux"))] + { + let mut buf = [0u8; 1]; + let mut total = 0usize; + loop { + if total >= max { + return Ok(total); + } + match self.try_recv_raw(&mut buf) { + Ok(_) => total += 1, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(total), + Err(e) => return Err(e), + } + } + } + } + /// Blocking receive. Polls for readability (respecting read_timeout), then receives. pub fn recv_raw_blocking(&self, buf: &mut [u8]) -> io::Result<(usize, Vec)> { let fd = self.inner.as_raw_fd(); @@ -352,22 +442,50 @@ pub type AsyncConn = AsyncFd; /// Async receive on a Tokio `AsyncFd`-wrapped IPC connection. /// -/// Allocates a buffer sized to `max_message_size()` per call and returns only the received -/// bytes (truncated), so no large buffer is held between receives. +/// Receives into a thread-local buffer pre-sized to `max_message_size()` (avoids a per-call +/// heap allocation of up to 4 MB), then calls `decode` with the received byte slice so the +/// caller can decode in-place without an intermediate `Vec`. Zero buffer allocations per +/// call once the thread-local has grown to full size. +/// +/// The buffer is borrowed only inside the synchronous `try_io` closure and is always released +/// before the next `.await`, so it is safe to reuse across concurrent tasks on the same thread +/// (Tokio's cooperative scheduler never runs two tasks simultaneously on the same thread). +/// +/// `decode` is wrapped in `Option` so that the `FnOnce` callback can be moved into the +/// `FnMut` closure required by `try_io`; it is only called once (on successful receive). /// /// Used by the server dispatch loop (generated by `#[service]` macro). -pub async fn recv_raw_async(fd: &AsyncConn) -> io::Result<(Vec, Vec)> { +pub async fn recv_raw_async(fd: &AsyncConn, decode: F) -> io::Result<(T, Vec)> +where + F: FnOnce(&[u8]) -> T, +{ + thread_local! { + /// Reusable receive buffer. Grows on first use; never shrinks. + static RECV_BUF: RefCell> = const { RefCell::new(Vec::new()) }; + } + // Wrap in Option to satisfy FnMut (take() is only called on successful receive). + let mut decode = Some(decode); loop { let mut guard = fd.readable().await?; - let mut buf = Vec::with_capacity(max_message_size()); - // SAFETY: all bit patterns are valid for u8; recvmsg writes exactly n bytes into - // the spare capacity before set_len(n) is called below. - let slice = unsafe { std::slice::from_raw_parts_mut(buf.as_mut_ptr(), max_message_size()) }; - match guard.try_io(|inner| recvmsg_raw(inner.as_raw_fd(), slice, MsgFlags::empty())) { - Ok(Ok((n, fds))) => { - unsafe { buf.set_len(n) }; - return Ok((buf, fds)); - } + match guard.try_io(|inner| { + RECV_BUF.with_borrow_mut(|buf| { + let size = max_message_size(); + if buf.len() < size { + buf.resize(size, 0u8); + } + match recvmsg_raw(inner.as_raw_fd(), buf, MsgFlags::empty()) { + Err(e) => Err(e), + Ok((n, fds)) => { + // recvmsg_raw's &mut borrow of buf has ended; safe to read &buf[..n]. + #[allow(clippy::unwrap_used)] // SAFETY: take() is only reached once + unsafe { + Ok((decode.take().unwrap_unchecked()(&buf[..n]), fds)) + } + } + } + }) + }) { + Ok(Ok(x)) => return Ok(x), Ok(Err(e)) => return Err(e), Err(_would_block) => continue, } @@ -388,3 +506,70 @@ pub async fn send_raw_async(fd: &AsyncConn, data: &[u8]) -> io::Result<()> { } } } + +/// Send `count` 1-byte ack messages using a single `sendmmsg(2)` syscall. +/// +/// Used by the server dispatch loop (Linux only) to flush batched acks for fire-and-forget +/// methods in one syscall instead of one `sendmsg` per call. Best-effort: errors are silently +/// dropped because the client's `send_count`/`ack_count` tracking handles missing acks by +/// waiting for all outstanding ones at the next blocking call. +#[cfg(target_os = "linux")] +pub async fn send_acks_async(fd: &AsyncConn, count: u32) { + const MAX_BATCH: usize = crate::ACK_BUFFER_SIZE as usize; + let count = (count as usize).min(MAX_BATCH); + if count == 0 { + return; + } + + // Only `offset` (a usize) lives across `.await` — keeping the future `Send`. + // The !Send iovec/mmsghdr arrays are constructed inside the synchronous try_io + // closure so they never appear in the async state machine. + let mut offset = 0usize; + loop { + let mut guard = match fd.writable().await { + Ok(g) => g, + Err(_) => return, + }; + match guard.try_io(|inner| { + // Build arrays here (inside the synchronous closure, not across .await) so that + // !Send *mut c_void pointers never appear in the async state machine. + // SAFETY: zeroed mmsghdr/iovec are valid initial states; null pointers in unused + // fields (msg_name, msg_control) are correct — the kernel ignores them for SEQPACKET. + let ack_byte: u8 = 0; + let mut iovs: [libc::iovec; MAX_BATCH] = unsafe { core::mem::zeroed() }; + let mut msgs: [libc::mmsghdr; MAX_BATCH] = unsafe { core::mem::zeroed() }; + let batch = count - offset; + for i in 0..batch { + iovs[i].iov_base = &ack_byte as *const u8 as *mut libc::c_void; + iovs[i].iov_len = 1; + msgs[i].msg_hdr.msg_iov = &mut iovs[i] as *mut libc::iovec; + msgs[i].msg_hdr.msg_iovlen = 1; + } + let n = unsafe { + libc::sendmmsg( + inner.as_raw_fd(), + msgs.as_mut_ptr(), + batch as libc::c_uint, + 0, + ) + }; + if n < 0 { + Err(io::Error::last_os_error()) + } else { + Ok(n as usize) + } + }) { + Ok(Ok(sent)) => { + offset += sent; + if offset >= count { + return; + } + // partial send: loop to retry the remainder + } + // drop on error: + // the client may already have terminated and we're processing outstanding messages + Ok(Err(_)) => return, + Err(_would_block) => {} // re-register for writability + } + } +} diff --git a/datadog-ipc/src/platform/windows/sockets.rs b/datadog-ipc/src/platform/windows/sockets.rs index 57eb3ddf96..f0b6e007ee 100644 --- a/datadog-ipc/src/platform/windows/sockets.rs +++ b/datadog-ipc/src/platform/windows/sockets.rs @@ -23,17 +23,20 @@ //! bytes beyond the maximum expected payload size. use crate::platform::message::MAX_FDS; -use std::future::Future; -use std::io; -use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, OwnedHandle, RawHandle}; -use std::path::Path; -use std::pin::Pin; -use std::ptr::{null, null_mut}; -use std::sync::{ - atomic::{AtomicU64, AtomicUsize, Ordering}, - Arc, Mutex, -}; use std::task::{Context, Poll}; +use std::{ + cell::RefCell, + future::Future, + io, + os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, OwnedHandle, RawHandle}, + path::Path, + pin::Pin, + ptr::{null, null_mut}, + sync::{ + atomic::{AtomicU64, AtomicUsize, Ordering}, + Arc, Mutex, + }, +}; // winapi – only used for things not cleanly available in windows-sys use winapi::shared::minwindef::ULONG; @@ -648,6 +651,26 @@ impl SeqpacketConn { pipe_read(self.raw_handle(), buf, false) } + /// Non-blocking drain of up to `max` available ack messages. Returns the count drained. + /// + /// `max` should be `send_count - ack_count`. Acks are always 1-byte payloads with no + /// handles; the buffer is sized for the wire format: 1 payload byte + the 4-byte + /// handle-count suffix = `1 + HANDLE_SUFFIX_SIZE`. + pub fn drain_acks_nonblocking(&self, max: usize) -> io::Result { + let mut buf = [0u8; 1 + HANDLE_SUFFIX_SIZE]; + let mut total = 0usize; + loop { + if total >= max { + return Ok(total); + } + match self.try_recv_raw(&mut buf) { + Ok(_) => total += 1, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(total), + Err(e) => return Err(e), + } + } + } + /// Blocking receive. /// /// `buf` must be at least `payload_max + HANDLE_SUFFIX_SIZE` bytes. @@ -858,14 +881,32 @@ impl SeqpacketListener { /// Uses `block_in_place` + raw `ReadFile` to avoid mio's 4 KB internal read- /// buffer limit. For message-mode pipes a single `ReadFile` delivers the /// entire message. -pub async fn recv_raw_async(conn: &AsyncConn) -> io::Result<(Vec, Vec)> { +/// Receive one IPC message and decode it in-place using the supplied callback. +/// +/// Uses a thread-local buffer sized to `max_message_size() + HANDLE_SUFFIX_SIZE` so that +/// the per-call heap allocation for the receive buffer is eliminated once the thread-local +/// has grown to full size. `decode` is called synchronously inside `block_in_place` with +/// a slice of the received bytes before the buffer is made available for the next receive. +pub async fn recv_raw_async(conn: &AsyncConn, decode: F) -> io::Result<(T, Vec)> +where + F: FnOnce(&[u8]) -> T, +{ + thread_local! { + /// Reusable receive buffer. Grows on first use; never shrinks. + static RECV_BUF: RefCell> = const { RefCell::new(Vec::new()) }; + } let raw = conn.as_raw_handle() as SysHANDLE; tokio::task::block_in_place(|| { - let size = max_message_size() + HANDLE_SUFFIX_SIZE; - let mut buf = vec![0u8; size]; - let (payload_len, handles) = pipe_read(raw, &mut buf, true)?; - buf.truncate(payload_len); - Ok((buf, handles)) + RECV_BUF.with_borrow_mut(|buf| { + let size = max_message_size() + HANDLE_SUFFIX_SIZE; + if buf.len() < size { + buf.resize(size, 0u8); + } + match pipe_read(raw, buf, true) { + Err(e) => Err(e), + Ok((payload_len, handles)) => Ok((decode(&buf[..payload_len]), handles)), + } + }) }) } diff --git a/datadog-sidecar/src/service/blocking.rs b/datadog-sidecar/src/service/blocking.rs index 8ce4b0f449..d9c0cb79e0 100644 --- a/datadog-sidecar/src/service/blocking.rs +++ b/datadog-sidecar/src/service/blocking.rs @@ -87,7 +87,7 @@ impl SidecarTransport { pub fn set_backpressure(&mut self, max_bytes: usize, max_queue: u64) -> io::Result<()> { let mut sender = lock_sender(self)?; - sender.max_outstanding = max_queue; + sender.max_outstanding = max_queue.max(21); #[cfg(unix)] sender.channel.0.conn.set_sndbuf_size(max_bytes)?; #[cfg(not(unix))] diff --git a/datadog-sidecar/src/service/sender.rs b/datadog-sidecar/src/service/sender.rs index db1c9eb83b..c21d62edfa 100644 --- a/datadog-sidecar/src/service/sender.rs +++ b/datadog-sidecar/src/service/sender.rs @@ -169,6 +169,11 @@ impl SidecarSender { /// Non-blocking drain of the outbox. Returns `true` if all messages were sent. fn try_drain_outbox(&mut self) -> bool { + // Drain pending acks when approaching the throttle threshold so the socket + // receive buffer doesn't fill up and block the sidecar from sending more acks. + if self.channel.0.outstanding() >= self.max_outstanding / 2 { + self.channel.0.drain_acks(); + } for slot in self.outbox.slots_mut() { if let Some(msg) = slot { if self.channel.0.outstanding() >= self.max_outstanding { diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index a2be2752b6..33dda9a760 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -296,12 +296,24 @@ impl SidecarServer { let futures = clients .values() - .filter_map(|client| client.client.lock_or_panic().worker.stats().ok()) + .filter_map(|client| { + client + .client + .lock_or_panic() + .as_ref() + .and_then(|c| c.worker.stats().ok()) + }) .collect::>(); let metric_counts = clients .values() - .map(|client| client.client.lock_or_panic().telemetry_metrics.len() as u32) + .map(|client| { + client + .client + .lock_or_panic() + .as_ref() + .map_or(0, |c| c.telemetry_metrics.len() as u32) + }) .collect::>(); (futures, metric_counts) @@ -399,26 +411,47 @@ impl SidecarInterface for ConnectionSidecarHandler { let process_tags = session.process_tags.lock_or_panic().clone(); - // Lock telemetry client + // Pre-compute session config so both the primary and retry get_or_create calls + // can use it without re-locking the session. + let session_config = session + .session_config + .lock_or_panic() + .as_ref() + .cloned() + .unwrap_or_else(|| { + warn!("Failed to get telemetry session config for {instance_id:?}"); + Config::default() + }); + + // Get or create the telemetry client. If we observe None under the lock it means + // another thread called take() (Stop) in the narrow window between get_or_create + // returning and us acquiring the lock — retry once to get a fresh client. let telemetry_mutex = self.server.telemetry_clients.get_or_create( service, env, &instance_id, &runtime_metadata, - || { - session - .session_config - .lock_or_panic() - .as_ref() - .cloned() - .unwrap_or_else(|| { - warn!("Failed to get telemetry session config for {instance_id:?}"); - Config::default() - }) - }, - process_tags, + || session_config.clone(), + process_tags.clone(), ); - let mut telemetry = telemetry_mutex.lock_or_panic(); + let telemetry_mutex = if telemetry_mutex.lock_or_panic().is_none() { + self.server.telemetry_clients.get_or_create( + service, + env, + &instance_id, + &runtime_metadata, + || session_config, + process_tags, + ) + } else { + telemetry_mutex + }; + let mut telemetry_guard = telemetry_mutex.lock_or_panic(); + let Some(telemetry) = telemetry_guard.as_mut() else { + // Extremely rare: the client was stopped between the two get_or_create calls. + warn!("enqueue_actions: telemetry client stopped during retry for instance {instance_id:?}; dropping actions"); + return; + }; // Auto-register any metrics known to this connection but not yet registered // in this telemetry client (e.g., the client was just created for a new service/env). @@ -440,24 +473,24 @@ impl SidecarInterface for ConnectionSidecarHandler { for action in actions { match action { SidecarAction::Telemetry(TelemetryActions::AddIntegration(ref integration)) => { - if telemetry.buffered_integrations.insert(integration.clone()) { + if telemetry.shared.integrations.insert(integration.clone()) { actions_to_process.push(action); buffered_info_changed = true; } } SidecarAction::PhpComposerTelemetryFile(path) => { - if telemetry.buffered_composer_paths.insert(path.clone()) { + if telemetry.shared.composer_paths.insert(path.clone()) { composer_paths_to_process.push(path); buffered_info_changed = true; } } SidecarAction::Telemetry(TelemetryActions::AddConfig(_)) => { - telemetry.config_sent = true; + telemetry.shared.config_sent = true; buffered_info_changed = true; actions_to_process.push(action); } SidecarAction::Telemetry(TelemetryActions::AddEndpoint(_)) => { - telemetry.last_endpoints_push = SystemTime::now(); + telemetry.shared.last_endpoints_push = SystemTime::now(); buffered_info_changed = true; actions_to_process.push(action); } @@ -473,6 +506,18 @@ impl SidecarInterface for ConnectionSidecarHandler { } } + if buffered_info_changed { + info!( + "Buffered telemetry info changed for instance {instance_id:?} and queue_id {queue_id:?}" + ); + telemetry.write_shm_file(); + } + + // take() must happen INSIDE the spawned task, after process_actions completes, + // so that a Config batch spawned before a Stop batch still finds Some when it + // runs (the last_handle chain guarantees Stop runs after Config). + let do_take = remove_client; + if !actions_to_process.is_empty() { let telemetry_mutex_clone = telemetry_mutex.clone(); let worker = telemetry.worker.clone(); @@ -481,9 +526,17 @@ impl SidecarInterface for ConnectionSidecarHandler { if let Some(last_handle) = last_handle { last_handle.await.ok(); }; - let processed = telemetry_mutex_clone - .lock_or_panic() - .process_actions(actions_to_process); + let processed = { + let mut guard = telemetry_mutex_clone.lock_or_panic(); + let processed = guard + .as_mut() + .map(|t| t.process_actions(actions_to_process)) + .unwrap_or_default(); + if do_take { + guard.take(); // drop client after Stop action is processed + } + processed + }; debug!("Sending Processed Actions :{processed:?}"); worker.send_msgs(processed).await.ok(); })); @@ -504,18 +557,14 @@ impl SidecarInterface for ConnectionSidecarHandler { })); } - if buffered_info_changed { - info!( - "Buffered telemetry info changed for instance {instance_id:?} and queue_id {queue_id:?}" - ); - telemetry.write_shm_file(); - } - + // telemetry borrow ends after the last use of telemetry.handle above. + // Remove from the map synchronously so new get_or_create calls get a fresh entry; + // take() is deferred to the spawned task to avoid racing with in-flight tasks. if remove_client { - info!("Removing telemetry client for instance {instance_id:?}"); self.server .telemetry_clients .remove_telemetry_client(service, env); + info!("Removing telemetry client for instance {instance_id:?}"); } } else { info!("No application found for instance {instance_id:?} and queue_id {queue_id:?}"); diff --git a/datadog-sidecar/src/service/telemetry.rs b/datadog-sidecar/src/service/telemetry.rs index e9beefedf9..9ab9d3fce0 100644 --- a/datadog-sidecar/src/service/telemetry.rs +++ b/datadog-sidecar/src/service/telemetry.rs @@ -25,7 +25,7 @@ use zwohash::ZwoHasher; use libdd_common::tag::Tag; use libdd_telemetry::worker::TelemetryWorkerBuilder; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::ops::Sub; use std::sync::LazyLock; use std::time::SystemTime; @@ -56,18 +56,34 @@ struct ComposerPackages { pub struct TelemetryCachedEntry { last_used: Instant, - pub client: Arc>, + pub client: Arc>>, } pub struct TelemetryCachedClient { pub worker: TelemetryWorkerHandle, pub shm_writer: OneWayShmWriter, - pub config_sent: bool, - pub buffered_integrations: HashSet, - pub buffered_composer_paths: HashSet, - pub last_endpoints_push: SystemTime, pub telemetry_metrics: HashMap, pub handle: Option>, + pub shared: TelemetryCachedClientShmData, +} + +#[derive(Deserialize, Serialize)] +pub struct TelemetryCachedClientShmData { + pub config_sent: bool, + pub integrations: HashSet, + pub composer_paths: HashSet, + pub last_endpoints_push: SystemTime, +} + +impl Default for TelemetryCachedClientShmData { + fn default() -> Self { + TelemetryCachedClientShmData { + config_sent: false, + integrations: HashSet::new(), + composer_paths: HashSet::new(), + last_endpoints_push: SystemTime::UNIX_EPOCH, + } + } } impl TelemetryCachedClient { @@ -76,7 +92,7 @@ impl TelemetryCachedClient { env: &str, instance_id: &InstanceId, runtime_meta: &RuntimeMetadata, - get_config: impl FnOnce() -> libdd_telemetry::config::Config, + get_config: impl FnOnce() -> Config, process_tags: Vec, ) -> Self { let mut builder = TelemetryWorkerBuilder::new_fetch_host( @@ -99,30 +115,30 @@ impl TelemetryCachedClient { builder.config = config.clone(); let (handle, _join) = builder.spawn(); + info!("spawned telemetry worker {config:?}"); + + let worker = handle.clone(); + tokio::spawn(async move { + worker + .send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start)) + .await + .ok(); + }); - info!("spawning telemetry worker {config:?}"); Self { - worker: handle.clone(), + worker: handle, shm_writer: { #[allow(clippy::unwrap_used)] OneWayShmWriter::::new(path_for_telemetry(service, env)).unwrap() }, - config_sent: false, - buffered_integrations: HashSet::new(), - buffered_composer_paths: HashSet::new(), - last_endpoints_push: SystemTime::UNIX_EPOCH, + shared: TelemetryCachedClientShmData::default(), telemetry_metrics: Default::default(), handle: None, } } pub fn write_shm_file(&self) { - if let Ok(buf) = bincode::serialize(&( - &self.config_sent, - &self.buffered_integrations, - &self.buffered_composer_paths, - &self.last_endpoints_push, - )) { + if let Ok(buf) = bincode::serialize(&self.shared) { self.shm_writer.write(&buf); } else { warn!("Failed to serialize telemetry data for shared memory"); @@ -219,7 +235,10 @@ impl TelemetryCachedClient { // cheap way to avoid unbounded caching const CACHE_INTERVAL: u64 = 2000; let last_clean = LAST_CACHE_CLEAN.load(Ordering::Relaxed); - let now_secs = Instant::now().elapsed().as_secs(); + let now_secs = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); if now_secs > last_clean + CACHE_INTERVAL && LAST_CACHE_CLEAN .compare_exchange( @@ -305,57 +324,42 @@ impl TelemetryCachedClientSet { runtime_meta: &RuntimeMetadata, get_config: F, process_tags: Vec, - ) -> Arc> + ) -> Arc>> where - F: FnOnce() -> libdd_telemetry::config::Config, + F: FnOnce() -> Config, { let key = (service.to_string(), env.to_string()); let mut map = self.inner.lock_or_panic(); if let Some(existing) = map.get_mut(&key) { - existing.last_used = Instant::now(); - tokio::spawn({ - let worker = existing.client.lock_or_panic().worker.clone(); - async move { - worker - .send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start)) - .await - .ok(); - } - }); - - info!("Reusing existing telemetry client for {key:?}"); - return existing.client.clone(); + if existing.client.lock_or_panic().is_some() { + existing.last_used = Instant::now(); + info!("Reusing existing telemetry client for {key:?}"); + return existing.client.clone(); + } + // Dead (None) entry — fall through to replace it with a fresh client. } - let entry = TelemetryCachedEntry { - last_used: Instant::now(), - client: Arc::new(Mutex::new(TelemetryCachedClient::new( - service, - env, - instance_id, - runtime_meta, - get_config, - process_tags, - ))), - }; - - let entry = map.entry(key.clone()).or_insert(entry); - - tokio::spawn({ - let worker = entry.client.lock_or_panic().worker.clone(); - async move { - worker - .send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start)) - .await - .ok(); - } - }); + let new_client = Arc::new(Mutex::new(Some(TelemetryCachedClient::new( + service, + env, + instance_id, + runtime_meta, + get_config, + process_tags, + )))); + map.insert( + key.clone(), + TelemetryCachedEntry { + last_used: Instant::now(), + client: new_client.clone(), + }, + ); info!("Created new telemetry client for {key:?}"); - entry.client.clone() + new_client } pub fn remove_telemetry_client(&self, service: &str, env: &str) { @@ -425,7 +429,13 @@ pub(crate) async fn telemetry_action_receiver_task( &actions.service_name, &actions.env_name, ); - let client = telemetry_client.lock_or_panic().worker.clone(); + let Some(client) = telemetry_client + .lock_or_panic() + .as_ref() + .map(|t| t.worker.clone()) + else { + continue; + }; for it_action in actions.actions { match it_action { @@ -442,14 +452,17 @@ pub(crate) async fn telemetry_action_receiver_task( } InternalTelemetryAction::RegisterTelemetryMetric(metric) => { debug!("Registered telemetry metric: {metric:?}"); - telemetry_client.lock_or_panic().register_metric(metric); + if let Some(t) = telemetry_client.lock_or_panic().as_mut() { + t.register_metric(metric); + } } InternalTelemetryAction::AddMetricPoint((value, name, tags)) => { let metric_name = name.clone(); let actions_point_opt = { telemetry_client .lock_or_panic() - .to_telemetry_point((name, value, tags)) + .as_ref() + .and_then(|t| t.to_telemetry_point((name, value, tags))) }; if let Some(actions_point) = actions_point_opt { match client.send_msg(actions_point).await { @@ -473,7 +486,7 @@ fn get_telemetry_client( instance_id: &InstanceId, service_name: &str, env_name: &str, -) -> Arc> { +) -> Arc>> { let session = sidecar.get_session(&instance_id.session_id); let trace_config = session.get_trace_config(); let runtime_meta = RuntimeMetadata::new( diff --git a/libdd-telemetry/src/worker/mod.rs b/libdd-telemetry/src/worker/mod.rs index 2bcbd0163b..d978c74d92 100644 --- a/libdd-telemetry/src/worker/mod.rs +++ b/libdd-telemetry/src/worker/mod.rs @@ -242,10 +242,9 @@ impl Add for TelemetryWorkerStats { buckets: self.metric_buckets.buckets + rhs.metric_buckets.buckets, series: self.metric_buckets.series + rhs.metric_buckets.series, series_points: self.metric_buckets.series_points + rhs.metric_buckets.series_points, - distributions: self.metric_buckets.distributions - + self.metric_buckets.distributions, + distributions: self.metric_buckets.distributions + rhs.metric_buckets.distributions, distributions_points: self.metric_buckets.distributions_points - + self.metric_buckets.distributions_points, + + rhs.metric_buckets.distributions_points, }, } }