Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 172 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ rand = { version = "0.10.0", features = ["thread_rng"]}
rand_distr = "0.6.0"
flatbuffers = "25.12.19"
serde_json = "1.0.149"
tokio = { version = "1.51.0", features = ["full"] }
futures = "0.3.32"

[target.'cfg(windows)'.dependencies]
rdkafka = { version="0.39.0", features = ["cmake_build"] }
Expand Down
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ INFO:saluki: 0 - low:7515, high:7551, num_messages:36

`saluki play mybroker:9092/source_topic mybroker:9092/dest_topic -t 1762209990 1762209992` - This will forward messages between the two given timestamps.


## `count` - Count topic data rates

`count` is used for viewing the current data rate in a given topic.

An example of using this could be:

`saluki count mybroker:9092/mytopic --message-interval 3` - this listens to the `mytopic` topic and prints the data rate between 3 second intervals.

# Developer setup

some system dependencies are required. On Windows these are built-in, but on a debian-based linux distro you will need `libcurl4-openssl-dev`
Expand Down
4 changes: 2 additions & 2 deletions src/consume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub fn consume(
}

consumer
.subscribe(&[&*topic.topic])
.subscribe(&[&topic.topic])
.unwrap_or_else(|_| panic!("Failed to subscribe to topic {}", topic.topic));

let mut counter = 0;
Expand Down Expand Up @@ -119,7 +119,7 @@ pub fn consume(
}
counter += 1;
if Some(counter) == num_messages || Some(counter) == last {
info!("Reached {} messages, exiting", counter);
println!("Reached {} messages, exiting", counter);
break;
}
}
Expand Down
51 changes: 51 additions & 0 deletions src/count.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use crate::cli_utils::BrokerAndTopic;
use futures::stream::StreamExt;
use log::error;
use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer};
use rdkafka::{ClientConfig, Message};
use tokio::time::{self, Duration};
use uuid::Uuid;

pub async fn count(topic: BrokerAndTopic, message_interval: u64) {
let consumer: StreamConsumer<DefaultConsumerContext> = ClientConfig::new()
.set("group.id", Uuid::new_v4().to_string())
.set("bootstrap.servers", topic.broker())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should provide some way to pass extra parameters here - either as command line args, in a config file, in env variables, whatever.

Probably extra command line args is easiest.

I'd rather not have to recompile if it turns out we need to specify a queue size or something.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll do this separately for all commands as part of #14

.create()
.expect("Consumer creation failed");

consumer
.subscribe(&[&topic.topic])
.unwrap_or_else(|e| panic!("Failed to subscribe to topic {}: {}", topic.topic, e));

let start = std::time::Instant::now();
let mut bytes_this_second: usize = 0;
let mut total_bytes: usize = 0;
let mut stream = consumer.stream();
let mut interval = time::interval(Duration::from_secs(message_interval));

loop {
tokio::select! {
_msg = stream.next() => {
match _msg {
Some(Ok(msg)) => {
if msg.payload().is_some() {
bytes_this_second += msg.payload_len();
total_bytes += msg.payload_len();
}

},
Some(Err(e)) => error!("Error reading from stream {:?}", e),
None => {}
}
}
_ = interval.tick() => {
println!("{:.5} Mbit/s (since program start: average {:.5} Mbit/s, {:.5} MB total)",
bytes_this_second as f64/125000.0,
Copy link
Copy Markdown
Member

@Tom-Willemsen Tom-Willemsen Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only true if interval is 1.

Streaming at ~190 mbit/s with --interval 5:

985.75168 Mbit/s (since program start: average 189.82563 Mbit/s, 9609.95335 MB total)

bytes_this_second needs dividing by interval to be correct here

total_bytes as f64 / 125000.0 / start.elapsed().as_secs_f64(),
total_bytes as f64 / 1_000_000.0
);
bytes_this_second = 0;
}
}
}
}
Loading