From 47d6cfe32afc3e71e95dfce77d5d786ff96d9969 Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Mon, 30 Mar 2026 22:55:11 +0200 Subject: [PATCH 1/6] Batch ack consumption The newly introduced acks always require an additional syscall to consume them. Using recvmmsg() to batch it. Also only consume them when we actually need to check for limits. Also avoid resending Start each time we access the TelemetryCachedClient. (Pure overhead) Signed-off-by: Bob Weinand --- datadog-ipc/src/client.rs | 40 +++---- datadog-ipc/src/platform/unix/sockets/mod.rs | 76 +++++++++++++ datadog-ipc/src/platform/windows/sockets.rs | 16 +++ datadog-sidecar/src/service/sender.rs | 5 + datadog-sidecar/src/service/sidecar_server.rs | 101 +++++++++++++----- datadog-sidecar/src/service/telemetry.rs | 101 +++++++++--------- libdd-telemetry/src/worker/mod.rs | 5 +- 7 files changed, 243 insertions(+), 101 deletions(-) diff --git a/datadog-ipc/src/client.rs b/datadog-ipc/src/client.rs index 395c852be3..75f2cf2a8f 100644 --- a/datadog-ipc/src/client.rs +++ b/datadog-ipc/src/client.rs @@ -30,8 +30,6 @@ pub struct IpcClientConn { recv_buf: Vec, /// Set to true when a fatal I/O error occurs on send or receive. closed: bool, - /// Skip draining when the caller already drained - drained_acks_since_send: bool, } impl IpcClientConn { @@ -42,7 +40,6 @@ impl IpcClientConn { ack_count: 0, recv_buf: vec![0u8; max_message_size() + HANDLE_SUFFIX_SIZE], closed: false, - drained_acks_since_send: true, } } @@ -64,18 +61,15 @@ impl IpcClientConn { self.send_count - self.ack_count } - /// Non-blocking drain of all pending acks for client side. Updates `ack_count`. + /// Non-blocking drain of all pending acks. Updates `ack_count`. + /// + /// On Linux uses `recvmmsg` to batch-receive up to 64 acks per syscall. pub fn drain_acks(&mut self) { - self.drained_acks_since_send = true; - loop { - match self.conn.try_recv_raw(&mut self.recv_buf) { - Ok(_) => self.ack_count += 1, - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break, - Err(e) => { - warn!("drain_acks: connection error ({}), marking closed", e); - self.closed = true; - break; - } + match self.conn.drain_acks_nonblocking() { + Ok(count) => self.ack_count += count as u64, + Err(e) => { + warn!("drain_acks: connection error ({}), marking closed", e); + self.closed = true; } } } @@ -85,11 +79,6 @@ impl IpcClientConn { /// Returns `false` if the socket would block (EAGAIN). /// `data` is unmodified after the call. pub fn try_send(&mut self, data: &mut Vec, fds: &[RawFd]) -> bool { - if !self.drained_acks_since_send { - self.drain_acks(); - } - self.drained_acks_since_send = false; - match self.conn.try_send_raw(data, fds) { Ok(()) => { self.send_count += 1; @@ -117,15 +106,20 @@ impl IpcClientConn { /// Blocking send + blocking receive of response. /// - /// Sends `data`/`fds` (blocking), then receives in a loop, skipping any - /// intermediate 0-byte acks for prior fire-and-forget messages, until the - /// ack for this specific send arrives. Returns the response bytes and any - /// transferred file descriptors. + /// Drains any pending fire-and-forget acks (non-blocking, batched on Linux via `recvmmsg`) + /// before sending, so the subsequent blocking recv loop only needs to wait for the single + /// response ack. Sends `data`/`fds` (blocking), then receives in a loop until the ack + /// for this specific send arrives. Returns the response bytes and any transferred file + /// descriptors. pub fn call( &mut self, data: &mut Vec, fds: &[RawFd], ) -> io::Result<(Vec, Vec)> { + self.drain_acks(); + if self.closed { + return Err(io::Error::from(io::ErrorKind::BrokenPipe)); + } self.conn.send_raw_blocking(data, fds).inspect_err(|e| { warn!("call: send failed ({}), marking closed", e); self.closed = true; diff --git a/datadog-ipc/src/platform/unix/sockets/mod.rs b/datadog-ipc/src/platform/unix/sockets/mod.rs index bed3b15baf..e12073b4bb 100644 --- a/datadog-ipc/src/platform/unix/sockets/mod.rs +++ b/datadog-ipc/src/platform/unix/sockets/mod.rs @@ -9,6 +9,8 @@ use nix::sys::socket::{recvmsg, sendmsg, AddressFamily, SockFlag, SockType}; pub use nix::sys::socket::{ControlMessage, ControlMessageOwned, MsgFlags, UnixAddr}; +#[cfg(target_os = "linux")] +use std::mem::MaybeUninit; use std::{ io, os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, @@ -286,6 +288,80 @@ impl SeqpacketConn { recvmsg_raw(self.inner.as_raw_fd(), buf, MsgFlags::empty()) } + /// Non-blocking drain of all available ack messages. Returns the count drained. + /// + /// Acks are always 1-byte payloads with no file descriptors, so a minimal per-slot buffer + /// suffices. On Linux, `recvmmsg(2)` batches up to 100 receives into a single syscall; on + /// other platforms individual `try_recv_raw` calls are used instead. + pub fn drain_acks_nonblocking(&self) -> io::Result { + #[cfg(target_os = "linux")] + { + const BATCH: usize = 100; + // SAFETY: bufs/iovs/msgs are written before being passed to recvmmsg. + // msg_name and msg_control must be null; all other fields are either set + // explicitly below or are output-only (msg_flags, msg_len). + let mut bufs = [const { MaybeUninit::<[u8; 1]>::uninit() }; BATCH]; + let mut iovs = [const { MaybeUninit::::uninit() }; BATCH]; + let mut msgs = [const { MaybeUninit::::uninit() }; BATCH]; + for i in 0..BATCH { + unsafe { + (*iovs[i].as_mut_ptr()).iov_base = bufs[i].as_mut_ptr() as *mut libc::c_void; + (*iovs[i].as_mut_ptr()).iov_len = 1; + (*msgs[i].as_mut_ptr()).msg_hdr.msg_name = std::ptr::null_mut(); + (*msgs[i].as_mut_ptr()).msg_hdr.msg_namelen = 0; + // addr_of_mut avoids creating a mutable reference to an array slot. + (*msgs[i].as_mut_ptr()).msg_hdr.msg_iov = iovs[i].as_mut_ptr(); + (*msgs[i].as_mut_ptr()).msg_hdr.msg_iovlen = 1; + (*msgs[i].as_mut_ptr()).msg_hdr.msg_control = std::ptr::null_mut(); + (*msgs[i].as_mut_ptr()).msg_hdr.msg_controllen = 0; + } + } + let fd = self.inner.as_raw_fd(); + let mut total = 0usize; + loop { + let n = unsafe { + libc::recvmmsg( + fd, + msgs[0].as_mut_ptr(), + BATCH as libc::c_uint, + libc::MSG_DONTWAIT as _, // seems to be inconsistent, sometimes i32, u32 + std::ptr::null_mut(), + ) + }; + if n < 0 { + let e = io::Error::last_os_error(); + return if e.kind() == io::ErrorKind::WouldBlock { + Ok(total) + } else { + Err(e) + }; + } + let n = n as usize; + for msg in msgs.iter().take(n) { + if unsafe { msg.assume_init_ref() }.msg_len == 0 { + return Err(io::Error::from(io::ErrorKind::BrokenPipe)); + } + } + total += n; + if n < BATCH { + return Ok(total); + } + } + } + #[cfg(not(target_os = "linux"))] + { + let mut buf = [0u8; 1]; + let mut total = 0usize; + loop { + match self.try_recv_raw(&mut buf) { + Ok(_) => total += 1, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(total), + Err(e) => return Err(e), + } + } + } + } + /// Blocking receive. Polls for readability (respecting read_timeout), then receives. pub fn recv_raw_blocking(&self, buf: &mut [u8]) -> io::Result<(usize, Vec)> { let fd = self.inner.as_raw_fd(); diff --git a/datadog-ipc/src/platform/windows/sockets.rs b/datadog-ipc/src/platform/windows/sockets.rs index 57eb3ddf96..3abac5faae 100644 --- a/datadog-ipc/src/platform/windows/sockets.rs +++ b/datadog-ipc/src/platform/windows/sockets.rs @@ -648,6 +648,22 @@ impl SeqpacketConn { pipe_read(self.raw_handle(), buf, false) } + /// Non-blocking drain of all available ack messages. Returns the count drained. + /// + /// Acks are always 1-byte payloads with no handles; the buffer is sized for the wire + /// format: 1 payload byte + the 4-byte handle-count suffix = `1 + HANDLE_SUFFIX_SIZE`. + pub fn drain_acks_nonblocking(&self) -> io::Result { + let mut buf = [0u8; 1 + HANDLE_SUFFIX_SIZE]; + let mut total = 0usize; + loop { + match self.try_recv_raw(&mut buf) { + Ok(_) => total += 1, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(total), + Err(e) => return Err(e), + } + } + } + /// Blocking receive. /// /// `buf` must be at least `payload_max + HANDLE_SUFFIX_SIZE` bytes. diff --git a/datadog-sidecar/src/service/sender.rs b/datadog-sidecar/src/service/sender.rs index db1c9eb83b..c21d62edfa 100644 --- a/datadog-sidecar/src/service/sender.rs +++ b/datadog-sidecar/src/service/sender.rs @@ -169,6 +169,11 @@ impl SidecarSender { /// Non-blocking drain of the outbox. Returns `true` if all messages were sent. fn try_drain_outbox(&mut self) -> bool { + // Drain pending acks when approaching the throttle threshold so the socket + // receive buffer doesn't fill up and block the sidecar from sending more acks. + if self.channel.0.outstanding() >= self.max_outstanding / 2 { + self.channel.0.drain_acks(); + } for slot in self.outbox.slots_mut() { if let Some(msg) = slot { if self.channel.0.outstanding() >= self.max_outstanding { diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index a2be2752b6..ea813276be 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -296,12 +296,24 @@ impl SidecarServer { let futures = clients .values() - .filter_map(|client| client.client.lock_or_panic().worker.stats().ok()) + .filter_map(|client| { + client + .client + .lock_or_panic() + .as_ref() + .and_then(|c| c.worker.stats().ok()) + }) .collect::>(); let metric_counts = clients .values() - .map(|client| client.client.lock_or_panic().telemetry_metrics.len() as u32) + .map(|client| { + client + .client + .lock_or_panic() + .as_ref() + .map_or(0, |c| c.telemetry_metrics.len() as u32) + }) .collect::>(); (futures, metric_counts) @@ -399,26 +411,45 @@ impl SidecarInterface for ConnectionSidecarHandler { let process_tags = session.process_tags.lock_or_panic().clone(); - // Lock telemetry client + // Pre-compute session config so both the primary and retry get_or_create calls + // can use it without re-locking the session. + let session_config = session + .session_config + .lock_or_panic() + .as_ref() + .cloned() + .unwrap_or_else(|| { + warn!("Failed to get telemetry session config for {instance_id:?}"); + Config::default() + }); + + // Get or create the telemetry client. If we observe None under the lock it means + // another thread called take() (Stop) in the narrow window between get_or_create + // returning and us acquiring the lock — retry once to get a fresh client. let telemetry_mutex = self.server.telemetry_clients.get_or_create( service, env, &instance_id, &runtime_metadata, - || { - session - .session_config - .lock_or_panic() - .as_ref() - .cloned() - .unwrap_or_else(|| { - warn!("Failed to get telemetry session config for {instance_id:?}"); - Config::default() - }) - }, - process_tags, + || session_config.clone(), + process_tags.clone(), ); - let mut telemetry = telemetry_mutex.lock_or_panic(); + let telemetry_mutex = if telemetry_mutex.lock_or_panic().is_none() { + self.server.telemetry_clients.get_or_create( + service, + env, + &instance_id, + &runtime_metadata, + || session_config, + process_tags, + ) + } else { + telemetry_mutex + }; + let mut telemetry_guard = telemetry_mutex.lock_or_panic(); + let Some(telemetry) = telemetry_guard.as_mut() else { + return; // extremely rare: stopped again between the two get_or_create calls + }; // Auto-register any metrics known to this connection but not yet registered // in this telemetry client (e.g., the client was just created for a new service/env). @@ -473,6 +504,18 @@ impl SidecarInterface for ConnectionSidecarHandler { } } + if buffered_info_changed { + info!( + "Buffered telemetry info changed for instance {instance_id:?} and queue_id {queue_id:?}" + ); + telemetry.write_shm_file(); + } + + // take() must happen INSIDE the spawned task, after process_actions completes, + // so that a Config batch spawned before a Stop batch still finds Some when it + // runs (the last_handle chain guarantees Stop runs after Config). + let do_take = remove_client; + if !actions_to_process.is_empty() { let telemetry_mutex_clone = telemetry_mutex.clone(); let worker = telemetry.worker.clone(); @@ -481,9 +524,17 @@ impl SidecarInterface for ConnectionSidecarHandler { if let Some(last_handle) = last_handle { last_handle.await.ok(); }; - let processed = telemetry_mutex_clone - .lock_or_panic() - .process_actions(actions_to_process); + let processed = { + let mut guard = telemetry_mutex_clone.lock_or_panic(); + let processed = guard + .as_mut() + .map(|t| t.process_actions(actions_to_process)) + .unwrap_or_default(); + if do_take { + guard.take(); // drop client after Stop action is processed + } + processed + }; debug!("Sending Processed Actions :{processed:?}"); worker.send_msgs(processed).await.ok(); })); @@ -504,18 +555,14 @@ impl SidecarInterface for ConnectionSidecarHandler { })); } - if buffered_info_changed { - info!( - "Buffered telemetry info changed for instance {instance_id:?} and queue_id {queue_id:?}" - ); - telemetry.write_shm_file(); - } - + // telemetry borrow ends after the last use of telemetry.handle above. + // Remove from the map synchronously so new get_or_create calls get a fresh entry; + // take() is deferred to the spawned task to avoid racing with in-flight tasks. if remove_client { - info!("Removing telemetry client for instance {instance_id:?}"); self.server .telemetry_clients .remove_telemetry_client(service, env); + info!("Removing telemetry client for instance {instance_id:?}"); } } else { info!("No application found for instance {instance_id:?} and queue_id {queue_id:?}"); diff --git a/datadog-sidecar/src/service/telemetry.rs b/datadog-sidecar/src/service/telemetry.rs index e9beefedf9..f994665c7a 100644 --- a/datadog-sidecar/src/service/telemetry.rs +++ b/datadog-sidecar/src/service/telemetry.rs @@ -56,7 +56,7 @@ struct ComposerPackages { pub struct TelemetryCachedEntry { last_used: Instant, - pub client: Arc>, + pub client: Arc>>, } pub struct TelemetryCachedClient { @@ -76,7 +76,7 @@ impl TelemetryCachedClient { env: &str, instance_id: &InstanceId, runtime_meta: &RuntimeMetadata, - get_config: impl FnOnce() -> libdd_telemetry::config::Config, + get_config: impl FnOnce() -> Config, process_tags: Vec, ) -> Self { let mut builder = TelemetryWorkerBuilder::new_fetch_host( @@ -99,10 +99,18 @@ impl TelemetryCachedClient { builder.config = config.clone(); let (handle, _join) = builder.spawn(); + info!("spawned telemetry worker {config:?}"); + + let worker = handle.clone(); + tokio::spawn(async move { + worker + .send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start)) + .await + .ok(); + }); - info!("spawning telemetry worker {config:?}"); Self { - worker: handle.clone(), + worker: handle, shm_writer: { #[allow(clippy::unwrap_used)] OneWayShmWriter::::new(path_for_telemetry(service, env)).unwrap() @@ -219,7 +227,10 @@ impl TelemetryCachedClient { // cheap way to avoid unbounded caching const CACHE_INTERVAL: u64 = 2000; let last_clean = LAST_CACHE_CLEAN.load(Ordering::Relaxed); - let now_secs = Instant::now().elapsed().as_secs(); + let now_secs = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); if now_secs > last_clean + CACHE_INTERVAL && LAST_CACHE_CLEAN .compare_exchange( @@ -305,57 +316,42 @@ impl TelemetryCachedClientSet { runtime_meta: &RuntimeMetadata, get_config: F, process_tags: Vec, - ) -> Arc> + ) -> Arc>> where - F: FnOnce() -> libdd_telemetry::config::Config, + F: FnOnce() -> Config, { let key = (service.to_string(), env.to_string()); let mut map = self.inner.lock_or_panic(); if let Some(existing) = map.get_mut(&key) { - existing.last_used = Instant::now(); - tokio::spawn({ - let worker = existing.client.lock_or_panic().worker.clone(); - async move { - worker - .send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start)) - .await - .ok(); - } - }); - - info!("Reusing existing telemetry client for {key:?}"); - return existing.client.clone(); + if existing.client.lock_or_panic().is_some() { + existing.last_used = Instant::now(); + info!("Reusing existing telemetry client for {key:?}"); + return existing.client.clone(); + } + // Dead (None) entry — fall through to replace it with a fresh client. } - let entry = TelemetryCachedEntry { - last_used: Instant::now(), - client: Arc::new(Mutex::new(TelemetryCachedClient::new( - service, - env, - instance_id, - runtime_meta, - get_config, - process_tags, - ))), - }; - - let entry = map.entry(key.clone()).or_insert(entry); - - tokio::spawn({ - let worker = entry.client.lock_or_panic().worker.clone(); - async move { - worker - .send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start)) - .await - .ok(); - } - }); + let new_client = Arc::new(Mutex::new(Some(TelemetryCachedClient::new( + service, + env, + instance_id, + runtime_meta, + get_config, + process_tags, + )))); + map.insert( + key.clone(), + TelemetryCachedEntry { + last_used: Instant::now(), + client: new_client.clone(), + }, + ); info!("Created new telemetry client for {key:?}"); - entry.client.clone() + new_client } pub fn remove_telemetry_client(&self, service: &str, env: &str) { @@ -425,7 +421,13 @@ pub(crate) async fn telemetry_action_receiver_task( &actions.service_name, &actions.env_name, ); - let client = telemetry_client.lock_or_panic().worker.clone(); + let Some(client) = telemetry_client + .lock_or_panic() + .as_ref() + .map(|t| t.worker.clone()) + else { + continue; + }; for it_action in actions.actions { match it_action { @@ -442,14 +444,17 @@ pub(crate) async fn telemetry_action_receiver_task( } InternalTelemetryAction::RegisterTelemetryMetric(metric) => { debug!("Registered telemetry metric: {metric:?}"); - telemetry_client.lock_or_panic().register_metric(metric); + if let Some(t) = telemetry_client.lock_or_panic().as_mut() { + t.register_metric(metric); + } } InternalTelemetryAction::AddMetricPoint((value, name, tags)) => { let metric_name = name.clone(); let actions_point_opt = { telemetry_client .lock_or_panic() - .to_telemetry_point((name, value, tags)) + .as_ref() + .and_then(|t| t.to_telemetry_point((name, value, tags))) }; if let Some(actions_point) = actions_point_opt { match client.send_msg(actions_point).await { @@ -473,7 +478,7 @@ fn get_telemetry_client( instance_id: &InstanceId, service_name: &str, env_name: &str, -) -> Arc> { +) -> Arc>> { let session = sidecar.get_session(&instance_id.session_id); let trace_config = session.get_trace_config(); let runtime_meta = RuntimeMetadata::new( diff --git a/libdd-telemetry/src/worker/mod.rs b/libdd-telemetry/src/worker/mod.rs index 2bcbd0163b..d978c74d92 100644 --- a/libdd-telemetry/src/worker/mod.rs +++ b/libdd-telemetry/src/worker/mod.rs @@ -242,10 +242,9 @@ impl Add for TelemetryWorkerStats { buckets: self.metric_buckets.buckets + rhs.metric_buckets.buckets, series: self.metric_buckets.series + rhs.metric_buckets.series, series_points: self.metric_buckets.series_points + rhs.metric_buckets.series_points, - distributions: self.metric_buckets.distributions - + self.metric_buckets.distributions, + distributions: self.metric_buckets.distributions + rhs.metric_buckets.distributions, distributions_points: self.metric_buckets.distributions_points - + self.metric_buckets.distributions_points, + + rhs.metric_buckets.distributions_points, }, } } From 7974f66444b2220546f6b53543fa4dc40911dd50 Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Wed, 1 Apr 2026 15:03:28 +0200 Subject: [PATCH 2/6] Batch ack sending too Signed-off-by: Bob Weinand --- datadog-ipc-macros/src/lib.rs | 27 ++++++++ datadog-ipc/src/lib.rs | 2 + datadog-ipc/src/platform/unix/sockets/mod.rs | 67 ++++++++++++++++++++ datadog-sidecar/src/service/blocking.rs | 2 +- 4 files changed, 97 insertions(+), 1 deletion(-) diff --git a/datadog-ipc-macros/src/lib.rs b/datadog-ipc-macros/src/lib.rs index e85e30a883..c34c9a3cd8 100644 --- a/datadog-ipc-macros/src/lib.rs +++ b/datadog-ipc-macros/src/lib.rs @@ -248,15 +248,39 @@ fn gen_serve_fn( .map(|(attrs, n, _)| quote! { #(#attrs)* #n }) .collect(); + let force_flush = if m.is_blocking { + quote! { true } + } else { + quote! { false } + }; + let response_code = if m.return_type.is_some() { + // Flush any buffered acks before sending the typed response, so the client's + // send_count/ack_count loop processes them before reading the real reply. quote! { + #[cfg(target_os = "linux")] + if __pending_acks > 0 { + datadog_ipc::send_acks_async(&async_fd, __pending_acks).await; + __pending_acks = 0; + } let result = handler.#name(peer, #(#field_names),*).await; let __resp_data = datadog_ipc::codec::encode(&result); datadog_ipc::send_raw_async(&async_fd, &__resp_data).await.ok(); } } else { + // On Linux, buffer up to 20 acks and flush in a single + // sendmmsg(2) syscall; on other platforms send each ack immediately. quote! { handler.#name(peer, #(#field_names),*).await; + #[cfg(target_os = "linux")] + { + __pending_acks += 1; + if #force_flush || __pending_acks >= 20 { + datadog_ipc::send_acks_async(&async_fd, __pending_acks).await; + __pending_acks = 0; + } + } + #[cfg(not(target_os = "linux"))] // 1-byte ack: distinguishable from EOF (0 bytes from recvmsg on closed socket). datadog_ipc::send_raw_async(&async_fd, &[0u8]).await.ok(); } @@ -283,6 +307,9 @@ fn gen_serve_fn( return; } }; + // Pending 1-byte acks for fire-and-forget methods, flushed via sendmmsg(2) on Linux. + #[cfg(target_os = "linux")] + let mut __pending_acks: u32 = 0; loop { let (buf, fds) = match datadog_ipc::recv_raw_async(&async_fd).await { Ok(x) => x, diff --git a/datadog-ipc/src/lib.rs b/datadog-ipc/src/lib.rs index 1ea96791da..e031b0189d 100644 --- a/datadog-ipc/src/lib.rs +++ b/datadog-ipc/src/lib.rs @@ -17,6 +17,8 @@ pub mod client; pub mod codec; pub use client::IpcClientConn; +#[cfg(target_os = "linux")] +pub use platform::send_acks_async; pub use platform::{ max_message_size, AsyncConn, PeerCredentials, SeqpacketConn, SeqpacketListener, HANDLE_SUFFIX_SIZE, diff --git a/datadog-ipc/src/platform/unix/sockets/mod.rs b/datadog-ipc/src/platform/unix/sockets/mod.rs index e12073b4bb..b3bc95f939 100644 --- a/datadog-ipc/src/platform/unix/sockets/mod.rs +++ b/datadog-ipc/src/platform/unix/sockets/mod.rs @@ -464,3 +464,70 @@ pub async fn send_raw_async(fd: &AsyncConn, data: &[u8]) -> io::Result<()> { } } } + +/// Send `count` 1-byte ack messages using a single `sendmmsg(2)` syscall. +/// +/// Used by the server dispatch loop (Linux only) to flush batched acks for fire-and-forget +/// methods in one syscall instead of one `sendmsg` per call. Best-effort: errors are silently +/// dropped because the client's `send_count`/`ack_count` tracking handles missing acks by +/// waiting for all outstanding ones at the next blocking call. +#[cfg(target_os = "linux")] +pub async fn send_acks_async(fd: &AsyncConn, count: u32) { + const MAX_BATCH: usize = 20; + let count = (count as usize).min(MAX_BATCH); + if count == 0 { + return; + } + + // Only `offset` (a usize) lives across `.await` — keeping the future `Send`. + // The !Send iovec/mmsghdr arrays are constructed inside the synchronous try_io + // closure so they never appear in the async state machine. + let mut offset = 0usize; + loop { + let mut guard = match fd.writable().await { + Ok(g) => g, + Err(_) => return, + }; + match guard.try_io(|inner| { + // Build arrays here (inside the synchronous closure, not across .await) so that + // !Send *mut c_void pointers never appear in the async state machine. + // SAFETY: zeroed mmsghdr/iovec are valid initial states; null pointers in unused + // fields (msg_name, msg_control) are correct — the kernel ignores them for SEQPACKET. + let ack_byte: u8 = 0; + let mut iovs: [libc::iovec; MAX_BATCH] = unsafe { core::mem::zeroed() }; + let mut msgs: [libc::mmsghdr; MAX_BATCH] = unsafe { core::mem::zeroed() }; + let batch = count - offset; + for i in 0..batch { + iovs[i].iov_base = &ack_byte as *const u8 as *mut libc::c_void; + iovs[i].iov_len = 1; + msgs[i].msg_hdr.msg_iov = &mut iovs[i] as *mut libc::iovec; + msgs[i].msg_hdr.msg_iovlen = 1; + } + let n = unsafe { + libc::sendmmsg( + inner.as_raw_fd(), + msgs.as_mut_ptr(), + batch as libc::c_uint, + 0, + ) + }; + if n < 0 { + Err(io::Error::last_os_error()) + } else { + Ok(n as usize) + } + }) { + Ok(Ok(sent)) => { + offset += sent; + if offset >= count { + return; + } + // partial send: loop to retry the remainder + } + // drop on error: + // the client may already have terminated and we're processing outstanding messages + Ok(Err(_)) => return, + Err(_would_block) => {} // re-register for writability + } + } +} diff --git a/datadog-sidecar/src/service/blocking.rs b/datadog-sidecar/src/service/blocking.rs index 8ce4b0f449..d9c0cb79e0 100644 --- a/datadog-sidecar/src/service/blocking.rs +++ b/datadog-sidecar/src/service/blocking.rs @@ -87,7 +87,7 @@ impl SidecarTransport { pub fn set_backpressure(&mut self, max_bytes: usize, max_queue: u64) -> io::Result<()> { let mut sender = lock_sender(self)?; - sender.max_outstanding = max_queue; + sender.max_outstanding = max_queue.max(21); #[cfg(unix)] sender.channel.0.conn.set_sndbuf_size(max_bytes)?; #[cfg(not(unix))] From 9734363bc661e04b085d404f7757901a7a988fda Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Wed, 1 Apr 2026 16:23:29 +0200 Subject: [PATCH 3/6] Avoid reallocating read buffer all the time Signed-off-by: Bob Weinand --- datadog-ipc-macros/src/lib.rs | 17 ++++--- datadog-ipc/src/platform/unix/sockets/mod.rs | 53 +++++++++++++++----- datadog-ipc/src/platform/windows/sockets.rs | 53 ++++++++++++++------ 3 files changed, 87 insertions(+), 36 deletions(-) diff --git a/datadog-ipc-macros/src/lib.rs b/datadog-ipc-macros/src/lib.rs index c34c9a3cd8..d88a9415d8 100644 --- a/datadog-ipc-macros/src/lib.rs +++ b/datadog-ipc-macros/src/lib.rs @@ -311,19 +311,20 @@ fn gen_serve_fn( #[cfg(target_os = "linux")] let mut __pending_acks: u32 = 0; loop { - let (buf, fds) = match datadog_ipc::recv_raw_async(&async_fd).await { - Ok(x) => x, + let (mut req, fds) = match datadog_ipc::recv_raw_async( + &async_fd, + |buf| datadog_ipc::codec::decode::<#enum_name>(buf), + ).await { + Ok((Ok(req), fds)) => (req, fds), + Ok((Err(_), _)) => { + ::tracing::warn!("IPC serve: failed to decode request"); + break; + } Err(e) => { ::tracing::trace!("IPC serve: recv (connection closed?): {e}"); break; } }; - let Ok(mut req) = - datadog_ipc::codec::decode::<#enum_name>(&buf) - else { - ::tracing::warn!("IPC serve: failed to decode request"); - break; - }; let mut __source = datadog_ipc::handles::FdSource::new(fds); if datadog_ipc::handles::TransferHandles::receive_handles( &mut req, diff --git a/datadog-ipc/src/platform/unix/sockets/mod.rs b/datadog-ipc/src/platform/unix/sockets/mod.rs index b3bc95f939..42c0088a70 100644 --- a/datadog-ipc/src/platform/unix/sockets/mod.rs +++ b/datadog-ipc/src/platform/unix/sockets/mod.rs @@ -12,6 +12,7 @@ pub use nix::sys::socket::{ControlMessage, ControlMessageOwned, MsgFlags, UnixAd #[cfg(target_os = "linux")] use std::mem::MaybeUninit; use std::{ + cell::RefCell, io, os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, sync::atomic::{AtomicUsize, Ordering}, @@ -428,22 +429,50 @@ pub type AsyncConn = AsyncFd; /// Async receive on a Tokio `AsyncFd`-wrapped IPC connection. /// -/// Allocates a buffer sized to `max_message_size()` per call and returns only the received -/// bytes (truncated), so no large buffer is held between receives. +/// Receives into a thread-local buffer pre-sized to `max_message_size()` (avoids a per-call +/// heap allocation of up to 4 MB), then calls `decode` with the received byte slice so the +/// caller can decode in-place without an intermediate `Vec`. Zero buffer allocations per +/// call once the thread-local has grown to full size. +/// +/// The buffer is borrowed only inside the synchronous `try_io` closure and is always released +/// before the next `.await`, so it is safe to reuse across concurrent tasks on the same thread +/// (Tokio's cooperative scheduler never runs two tasks simultaneously on the same thread). +/// +/// `decode` is wrapped in `Option` so that the `FnOnce` callback can be moved into the +/// `FnMut` closure required by `try_io`; it is only called once (on successful receive). /// /// Used by the server dispatch loop (generated by `#[service]` macro). -pub async fn recv_raw_async(fd: &AsyncConn) -> io::Result<(Vec, Vec)> { +pub async fn recv_raw_async(fd: &AsyncConn, decode: F) -> io::Result<(T, Vec)> +where + F: FnOnce(&[u8]) -> T, +{ + thread_local! { + /// Reusable receive buffer. Grows on first use; never shrinks. + static RECV_BUF: RefCell> = const { RefCell::new(Vec::new()) }; + } + // Wrap in Option to satisfy FnMut (take() is only called on successful receive). + let mut decode = Some(decode); loop { let mut guard = fd.readable().await?; - let mut buf = Vec::with_capacity(max_message_size()); - // SAFETY: all bit patterns are valid for u8; recvmsg writes exactly n bytes into - // the spare capacity before set_len(n) is called below. - let slice = unsafe { std::slice::from_raw_parts_mut(buf.as_mut_ptr(), max_message_size()) }; - match guard.try_io(|inner| recvmsg_raw(inner.as_raw_fd(), slice, MsgFlags::empty())) { - Ok(Ok((n, fds))) => { - unsafe { buf.set_len(n) }; - return Ok((buf, fds)); - } + match guard.try_io(|inner| { + RECV_BUF.with_borrow_mut(|buf| { + let size = max_message_size(); + if buf.len() < size { + buf.resize(size, 0u8); + } + match recvmsg_raw(inner.as_raw_fd(), buf, MsgFlags::empty()) { + Err(e) => Err(e), + Ok((n, fds)) => { + // recvmsg_raw's &mut borrow of buf has ended; safe to read &buf[..n]. + #[allow(clippy::unwrap_used)] // SAFETY: take() is only reached once + unsafe { + Ok((decode.take().unwrap_unchecked()(&buf[..n]), fds)) + } + } + } + }) + }) { + Ok(Ok(x)) => return Ok(x), Ok(Err(e)) => return Err(e), Err(_would_block) => continue, } diff --git a/datadog-ipc/src/platform/windows/sockets.rs b/datadog-ipc/src/platform/windows/sockets.rs index 3abac5faae..2acb0629a7 100644 --- a/datadog-ipc/src/platform/windows/sockets.rs +++ b/datadog-ipc/src/platform/windows/sockets.rs @@ -23,17 +23,20 @@ //! bytes beyond the maximum expected payload size. use crate::platform::message::MAX_FDS; -use std::future::Future; -use std::io; -use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, OwnedHandle, RawHandle}; -use std::path::Path; -use std::pin::Pin; -use std::ptr::{null, null_mut}; -use std::sync::{ - atomic::{AtomicU64, AtomicUsize, Ordering}, - Arc, Mutex, -}; use std::task::{Context, Poll}; +use std::{ + cell::RefCell, + future::Future, + io, + os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, OwnedHandle, RawHandle}, + path::Path, + pin::Pin, + ptr::{null, null_mut}, + sync::{ + atomic::{AtomicU64, AtomicUsize, Ordering}, + Arc, Mutex, + }, +}; // winapi – only used for things not cleanly available in windows-sys use winapi::shared::minwindef::ULONG; @@ -874,14 +877,32 @@ impl SeqpacketListener { /// Uses `block_in_place` + raw `ReadFile` to avoid mio's 4 KB internal read- /// buffer limit. For message-mode pipes a single `ReadFile` delivers the /// entire message. -pub async fn recv_raw_async(conn: &AsyncConn) -> io::Result<(Vec, Vec)> { +/// Receive one IPC message and decode it in-place using the supplied callback. +/// +/// Uses a thread-local buffer sized to `max_message_size() + HANDLE_SUFFIX_SIZE` so that +/// the per-call heap allocation for the receive buffer is eliminated once the thread-local +/// has grown to full size. `decode` is called synchronously inside `block_in_place` with +/// a slice of the received bytes before the buffer is made available for the next receive. +pub async fn recv_raw_async(conn: &AsyncConn, decode: F) -> io::Result<(T, Vec)> +where + F: FnOnce(&[u8]) -> T, +{ + thread_local! { + /// Reusable receive buffer. Grows on first use; never shrinks. + static RECV_BUF: RefCell> = const { RefCell::new(Vec::new()) }; + } let raw = conn.as_raw_handle() as SysHANDLE; tokio::task::block_in_place(|| { - let size = max_message_size() + HANDLE_SUFFIX_SIZE; - let mut buf = vec![0u8; size]; - let (payload_len, handles) = pipe_read(raw, &mut buf, true)?; - buf.truncate(payload_len); - Ok((buf, handles)) + RECV_BUF.with_borrow_mut(|buf| { + let size = max_message_size() + HANDLE_SUFFIX_SIZE; + if buf.len() < size { + buf.resize(size, 0u8); + } + match pipe_read(raw, buf, true) { + Err(e) => Err(e), + Ok((payload_len, handles)) => Ok((decode(&buf[..payload_len]), handles)), + } + }) }) } From b63ac02a696054c7b584cb8460eed0be9ac6feba Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Wed, 1 Apr 2026 17:49:03 +0200 Subject: [PATCH 4/6] Reduce amount of acks polled for Signed-off-by: Bob Weinand --- datadog-ipc/src/client.rs | 9 +++++--- datadog-ipc/src/platform/unix/sockets/mod.rs | 23 +++++++++++++++----- datadog-ipc/src/platform/windows/sockets.rs | 12 ++++++---- 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/datadog-ipc/src/client.rs b/datadog-ipc/src/client.rs index 75f2cf2a8f..b617979abd 100644 --- a/datadog-ipc/src/client.rs +++ b/datadog-ipc/src/client.rs @@ -61,11 +61,14 @@ impl IpcClientConn { self.send_count - self.ack_count } - /// Non-blocking drain of all pending acks. Updates `ack_count`. + /// Non-blocking drain of pending acks up to the number of outstanding messages. /// - /// On Linux uses `recvmmsg` to batch-receive up to 64 acks per syscall. + /// Passing `outstanding()` as the bound avoids initialising more kernel structures than + /// needed and skips the final `WouldBlock` probe when all expected acks have arrived. + /// On Linux uses `recvmmsg` to batch-receive up to 100 acks per syscall. pub fn drain_acks(&mut self) { - match self.conn.drain_acks_nonblocking() { + let max = self.outstanding() as usize; + match self.conn.drain_acks_nonblocking(max) { Ok(count) => self.ack_count += count as u64, Err(e) => { warn!("drain_acks: connection error ({}), marking closed", e); diff --git a/datadog-ipc/src/platform/unix/sockets/mod.rs b/datadog-ipc/src/platform/unix/sockets/mod.rs index 42c0088a70..ed239a48cb 100644 --- a/datadog-ipc/src/platform/unix/sockets/mod.rs +++ b/datadog-ipc/src/platform/unix/sockets/mod.rs @@ -289,22 +289,31 @@ impl SeqpacketConn { recvmsg_raw(self.inner.as_raw_fd(), buf, MsgFlags::empty()) } - /// Non-blocking drain of all available ack messages. Returns the count drained. + /// Non-blocking drain of up to `max` available ack messages. Returns the count drained. + /// + /// `max` should be `send_count - ack_count` (the number of outstanding unacknowledged + /// messages); passing a tight bound avoids initialising more kernel structures than needed + /// and lets the loop exit without a final `WouldBlock` syscall. /// /// Acks are always 1-byte payloads with no file descriptors, so a minimal per-slot buffer /// suffices. On Linux, `recvmmsg(2)` batches up to 100 receives into a single syscall; on /// other platforms individual `try_recv_raw` calls are used instead. - pub fn drain_acks_nonblocking(&self) -> io::Result { + pub fn drain_acks_nonblocking(&self, max: usize) -> io::Result { + if max == 0 { + return Ok(0); + } #[cfg(target_os = "linux")] { const BATCH: usize = 100; + // Only initialise as many slots as we could possibly need. + let batch = max.min(BATCH); // SAFETY: bufs/iovs/msgs are written before being passed to recvmmsg. // msg_name and msg_control must be null; all other fields are either set // explicitly below or are output-only (msg_flags, msg_len). let mut bufs = [const { MaybeUninit::<[u8; 1]>::uninit() }; BATCH]; let mut iovs = [const { MaybeUninit::::uninit() }; BATCH]; let mut msgs = [const { MaybeUninit::::uninit() }; BATCH]; - for i in 0..BATCH { + for i in 0..batch { unsafe { (*iovs[i].as_mut_ptr()).iov_base = bufs[i].as_mut_ptr() as *mut libc::c_void; (*iovs[i].as_mut_ptr()).iov_len = 1; @@ -320,11 +329,12 @@ impl SeqpacketConn { let fd = self.inner.as_raw_fd(); let mut total = 0usize; loop { + let this_batch = (max - total).min(batch) as libc::c_uint; let n = unsafe { libc::recvmmsg( fd, msgs[0].as_mut_ptr(), - BATCH as libc::c_uint, + this_batch, libc::MSG_DONTWAIT as _, // seems to be inconsistent, sometimes i32, u32 std::ptr::null_mut(), ) @@ -344,7 +354,7 @@ impl SeqpacketConn { } } total += n; - if n < BATCH { + if n < this_batch as usize || total >= max { return Ok(total); } } @@ -354,6 +364,9 @@ impl SeqpacketConn { let mut buf = [0u8; 1]; let mut total = 0usize; loop { + if total >= max { + return Ok(total); + } match self.try_recv_raw(&mut buf) { Ok(_) => total += 1, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(total), diff --git a/datadog-ipc/src/platform/windows/sockets.rs b/datadog-ipc/src/platform/windows/sockets.rs index 2acb0629a7..f0b6e007ee 100644 --- a/datadog-ipc/src/platform/windows/sockets.rs +++ b/datadog-ipc/src/platform/windows/sockets.rs @@ -651,14 +651,18 @@ impl SeqpacketConn { pipe_read(self.raw_handle(), buf, false) } - /// Non-blocking drain of all available ack messages. Returns the count drained. + /// Non-blocking drain of up to `max` available ack messages. Returns the count drained. /// - /// Acks are always 1-byte payloads with no handles; the buffer is sized for the wire - /// format: 1 payload byte + the 4-byte handle-count suffix = `1 + HANDLE_SUFFIX_SIZE`. - pub fn drain_acks_nonblocking(&self) -> io::Result { + /// `max` should be `send_count - ack_count`. Acks are always 1-byte payloads with no + /// handles; the buffer is sized for the wire format: 1 payload byte + the 4-byte + /// handle-count suffix = `1 + HANDLE_SUFFIX_SIZE`. + pub fn drain_acks_nonblocking(&self, max: usize) -> io::Result { let mut buf = [0u8; 1 + HANDLE_SUFFIX_SIZE]; let mut total = 0usize; loop { + if total >= max { + return Ok(total); + } match self.try_recv_raw(&mut buf) { Ok(_) => total += 1, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(total), From 28c3df1cb6d47b31926d93b3e83171b6807a6d8b Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Thu, 2 Apr 2026 12:51:53 +0200 Subject: [PATCH 5/6] Use a proper struct for Telemetry SHM data Signed-off-by: Bob Weinand --- datadog-sidecar/src/service/sidecar_server.rs | 8 ++-- datadog-sidecar/src/service/telemetry.rs | 38 +++++++++++-------- 2 files changed, 27 insertions(+), 19 deletions(-) diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index ea813276be..73f6608ed3 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -471,24 +471,24 @@ impl SidecarInterface for ConnectionSidecarHandler { for action in actions { match action { SidecarAction::Telemetry(TelemetryActions::AddIntegration(ref integration)) => { - if telemetry.buffered_integrations.insert(integration.clone()) { + if telemetry.shared.integrations.insert(integration.clone()) { actions_to_process.push(action); buffered_info_changed = true; } } SidecarAction::PhpComposerTelemetryFile(path) => { - if telemetry.buffered_composer_paths.insert(path.clone()) { + if telemetry.shared.composer_paths.insert(path.clone()) { composer_paths_to_process.push(path); buffered_info_changed = true; } } SidecarAction::Telemetry(TelemetryActions::AddConfig(_)) => { - telemetry.config_sent = true; + telemetry.shared.config_sent = true; buffered_info_changed = true; actions_to_process.push(action); } SidecarAction::Telemetry(TelemetryActions::AddEndpoint(_)) => { - telemetry.last_endpoints_push = SystemTime::now(); + telemetry.shared.last_endpoints_push = SystemTime::now(); buffered_info_changed = true; actions_to_process.push(action); } diff --git a/datadog-sidecar/src/service/telemetry.rs b/datadog-sidecar/src/service/telemetry.rs index f994665c7a..9ab9d3fce0 100644 --- a/datadog-sidecar/src/service/telemetry.rs +++ b/datadog-sidecar/src/service/telemetry.rs @@ -25,7 +25,7 @@ use zwohash::ZwoHasher; use libdd_common::tag::Tag; use libdd_telemetry::worker::TelemetryWorkerBuilder; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::ops::Sub; use std::sync::LazyLock; use std::time::SystemTime; @@ -62,12 +62,28 @@ pub struct TelemetryCachedEntry { pub struct TelemetryCachedClient { pub worker: TelemetryWorkerHandle, pub shm_writer: OneWayShmWriter, - pub config_sent: bool, - pub buffered_integrations: HashSet, - pub buffered_composer_paths: HashSet, - pub last_endpoints_push: SystemTime, pub telemetry_metrics: HashMap, pub handle: Option>, + pub shared: TelemetryCachedClientShmData, +} + +#[derive(Deserialize, Serialize)] +pub struct TelemetryCachedClientShmData { + pub config_sent: bool, + pub integrations: HashSet, + pub composer_paths: HashSet, + pub last_endpoints_push: SystemTime, +} + +impl Default for TelemetryCachedClientShmData { + fn default() -> Self { + TelemetryCachedClientShmData { + config_sent: false, + integrations: HashSet::new(), + composer_paths: HashSet::new(), + last_endpoints_push: SystemTime::UNIX_EPOCH, + } + } } impl TelemetryCachedClient { @@ -115,22 +131,14 @@ impl TelemetryCachedClient { #[allow(clippy::unwrap_used)] OneWayShmWriter::::new(path_for_telemetry(service, env)).unwrap() }, - config_sent: false, - buffered_integrations: HashSet::new(), - buffered_composer_paths: HashSet::new(), - last_endpoints_push: SystemTime::UNIX_EPOCH, + shared: TelemetryCachedClientShmData::default(), telemetry_metrics: Default::default(), handle: None, } } pub fn write_shm_file(&self) { - if let Ok(buf) = bincode::serialize(&( - &self.config_sent, - &self.buffered_integrations, - &self.buffered_composer_paths, - &self.last_endpoints_push, - )) { + if let Ok(buf) = bincode::serialize(&self.shared) { self.shm_writer.write(&buf); } else { warn!("Failed to serialize telemetry data for shared memory"); From bca1af94e1d01625efb0ef3e2d47c918d38c6a4f Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Thu, 2 Apr 2026 16:58:40 +0200 Subject: [PATCH 6/6] PR comments Signed-off-by: Bob Weinand --- datadog-ipc-macros/src/lib.rs | 2 +- datadog-ipc/src/lib.rs | 4 ++++ datadog-ipc/src/platform/unix/sockets/mod.rs | 2 +- datadog-sidecar/src/service/sidecar_server.rs | 4 +++- 4 files changed, 9 insertions(+), 3 deletions(-) diff --git a/datadog-ipc-macros/src/lib.rs b/datadog-ipc-macros/src/lib.rs index d88a9415d8..dc3a69bf52 100644 --- a/datadog-ipc-macros/src/lib.rs +++ b/datadog-ipc-macros/src/lib.rs @@ -275,7 +275,7 @@ fn gen_serve_fn( #[cfg(target_os = "linux")] { __pending_acks += 1; - if #force_flush || __pending_acks >= 20 { + if #force_flush || __pending_acks >= datadog_ipc::ACK_BUFFER_SIZE { datadog_ipc::send_acks_async(&async_fd, __pending_acks).await; __pending_acks = 0; } diff --git a/datadog-ipc/src/lib.rs b/datadog-ipc/src/lib.rs index e031b0189d..144fcb5442 100644 --- a/datadog-ipc/src/lib.rs +++ b/datadog-ipc/src/lib.rs @@ -19,6 +19,10 @@ pub mod codec; pub use client::IpcClientConn; #[cfg(target_os = "linux")] pub use platform::send_acks_async; + +/// Maximum number of 1-byte acks buffered per connection before a forced flush. +/// Must match the `MAX_BATCH` limit inside `send_acks_async`. +pub const ACK_BUFFER_SIZE: u32 = 20; pub use platform::{ max_message_size, AsyncConn, PeerCredentials, SeqpacketConn, SeqpacketListener, HANDLE_SUFFIX_SIZE, diff --git a/datadog-ipc/src/platform/unix/sockets/mod.rs b/datadog-ipc/src/platform/unix/sockets/mod.rs index ed239a48cb..722ca3bd1a 100644 --- a/datadog-ipc/src/platform/unix/sockets/mod.rs +++ b/datadog-ipc/src/platform/unix/sockets/mod.rs @@ -515,7 +515,7 @@ pub async fn send_raw_async(fd: &AsyncConn, data: &[u8]) -> io::Result<()> { /// waiting for all outstanding ones at the next blocking call. #[cfg(target_os = "linux")] pub async fn send_acks_async(fd: &AsyncConn, count: u32) { - const MAX_BATCH: usize = 20; + const MAX_BATCH: usize = crate::ACK_BUFFER_SIZE as usize; let count = (count as usize).min(MAX_BATCH); if count == 0 { return; diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index 73f6608ed3..33dda9a760 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -448,7 +448,9 @@ impl SidecarInterface for ConnectionSidecarHandler { }; let mut telemetry_guard = telemetry_mutex.lock_or_panic(); let Some(telemetry) = telemetry_guard.as_mut() else { - return; // extremely rare: stopped again between the two get_or_create calls + // Extremely rare: the client was stopped between the two get_or_create calls. + warn!("enqueue_actions: telemetry client stopped during retry for instance {instance_id:?}; dropping actions"); + return; }; // Auto-register any metrics known to this connection but not yet registered