Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions datadog-ipc-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,15 +248,39 @@ fn gen_serve_fn(
.map(|(attrs, n, _)| quote! { #(#attrs)* #n })
.collect();

let force_flush = if m.is_blocking {
quote! { true }
} else {
quote! { false }
};

let response_code = if m.return_type.is_some() {
// Flush any buffered acks before sending the typed response, so the client's
// send_count/ack_count loop processes them before reading the real reply.
quote! {
#[cfg(target_os = "linux")]
if __pending_acks > 0 {
datadog_ipc::send_acks_async(&async_fd, __pending_acks).await;
__pending_acks = 0;
}
let result = handler.#name(peer, #(#field_names),*).await;
let __resp_data = datadog_ipc::codec::encode(&result);
datadog_ipc::send_raw_async(&async_fd, &__resp_data).await.ok();
}
} else {
// On Linux, buffer up to 20 acks and flush in a single
// sendmmsg(2) syscall; on other platforms send each ack immediately.
quote! {
handler.#name(peer, #(#field_names),*).await;
#[cfg(target_os = "linux")]
{
__pending_acks += 1;
if #force_flush || __pending_acks >= datadog_ipc::ACK_BUFFER_SIZE {
datadog_ipc::send_acks_async(&async_fd, __pending_acks).await;
__pending_acks = 0;
}
}
#[cfg(not(target_os = "linux"))]
// 1-byte ack: distinguishable from EOF (0 bytes from recvmsg on closed socket).
datadog_ipc::send_raw_async(&async_fd, &[0u8]).await.ok();
}
Expand All @@ -283,20 +307,24 @@ fn gen_serve_fn(
return;
}
};
// Pending 1-byte acks for fire-and-forget methods, flushed via sendmmsg(2) on Linux.
#[cfg(target_os = "linux")]
let mut __pending_acks: u32 = 0;
loop {
let (buf, fds) = match datadog_ipc::recv_raw_async(&async_fd).await {
Ok(x) => x,
let (mut req, fds) = match datadog_ipc::recv_raw_async(
&async_fd,
|buf| datadog_ipc::codec::decode::<#enum_name>(buf),
).await {
Ok((Ok(req), fds)) => (req, fds),
Ok((Err(_), _)) => {
::tracing::warn!("IPC serve: failed to decode request");
break;
}
Err(e) => {
::tracing::trace!("IPC serve: recv (connection closed?): {e}");
break;
}
};
let Ok(mut req) =
datadog_ipc::codec::decode::<#enum_name>(&buf)
else {
::tracing::warn!("IPC serve: failed to decode request");
break;
};
let mut __source = datadog_ipc::handles::FdSource::new(fds);
if datadog_ipc::handles::TransferHandles::receive_handles(
&mut req,
Expand Down
43 changes: 20 additions & 23 deletions datadog-ipc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ pub struct IpcClientConn {
recv_buf: Vec<u8>,
/// Set to true when a fatal I/O error occurs on send or receive.
closed: bool,
/// Skip draining when the caller already drained
drained_acks_since_send: bool,
}

impl IpcClientConn {
Expand All @@ -42,7 +40,6 @@ impl IpcClientConn {
ack_count: 0,
recv_buf: vec![0u8; max_message_size() + HANDLE_SUFFIX_SIZE],
closed: false,
drained_acks_since_send: true,
}
}

Expand All @@ -64,18 +61,18 @@ impl IpcClientConn {
self.send_count - self.ack_count
}

/// Non-blocking drain of all pending acks for client side. Updates `ack_count`.
/// Non-blocking drain of pending acks up to the number of outstanding messages.
///
/// Passing `outstanding()` as the bound avoids initialising more kernel structures than
/// needed and skips the final `WouldBlock` probe when all expected acks have arrived.
/// On Linux uses `recvmmsg` to batch-receive up to 100 acks per syscall.
pub fn drain_acks(&mut self) {
self.drained_acks_since_send = true;
loop {
match self.conn.try_recv_raw(&mut self.recv_buf) {
Ok(_) => self.ack_count += 1,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(e) => {
warn!("drain_acks: connection error ({}), marking closed", e);
self.closed = true;
break;
}
let max = self.outstanding() as usize;
match self.conn.drain_acks_nonblocking(max) {
Ok(count) => self.ack_count += count as u64,
Err(e) => {
warn!("drain_acks: connection error ({}), marking closed", e);
self.closed = true;
}
}
}
Expand All @@ -85,11 +82,6 @@ impl IpcClientConn {
/// Returns `false` if the socket would block (EAGAIN).
/// `data` is unmodified after the call.
pub fn try_send(&mut self, data: &mut Vec<u8>, fds: &[RawFd]) -> bool {
if !self.drained_acks_since_send {
self.drain_acks();
}
self.drained_acks_since_send = false;

match self.conn.try_send_raw(data, fds) {
Ok(()) => {
self.send_count += 1;
Expand Down Expand Up @@ -117,15 +109,20 @@ impl IpcClientConn {

/// Blocking send + blocking receive of response.
///
/// Sends `data`/`fds` (blocking), then receives in a loop, skipping any
/// intermediate 0-byte acks for prior fire-and-forget messages, until the
/// ack for this specific send arrives. Returns the response bytes and any
/// transferred file descriptors.
/// Drains any pending fire-and-forget acks (non-blocking, batched on Linux via `recvmmsg`)
/// before sending, so the subsequent blocking recv loop only needs to wait for the single
/// response ack. Sends `data`/`fds` (blocking), then receives in a loop until the ack
/// for this specific send arrives. Returns the response bytes and any transferred file
/// descriptors.
pub fn call(
&mut self,
data: &mut Vec<u8>,
fds: &[RawFd],
) -> io::Result<(Vec<u8>, Vec<OwnedFd>)> {
self.drain_acks();
if self.closed {
return Err(io::Error::from(io::ErrorKind::BrokenPipe));
}
self.conn.send_raw_blocking(data, fds).inspect_err(|e| {
warn!("call: send failed ({}), marking closed", e);
self.closed = true;
Expand Down
6 changes: 6 additions & 0 deletions datadog-ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ pub mod client;
pub mod codec;

pub use client::IpcClientConn;
#[cfg(target_os = "linux")]
pub use platform::send_acks_async;

/// Maximum number of 1-byte acks buffered per connection before a forced flush.
/// Must match the `MAX_BATCH` limit inside `send_acks_async`.
pub const ACK_BUFFER_SIZE: u32 = 20;
pub use platform::{
max_message_size, AsyncConn, PeerCredentials, SeqpacketConn, SeqpacketListener,
HANDLE_SUFFIX_SIZE,
Expand Down
Loading
Loading