Skip to content

[SPARK-56543] Add RTM stateless benchmark#55420

Closed
jerrypeng wants to merge 12 commits into
apache:masterfrom
jerrypeng:SPARK-56543
Closed

[SPARK-56543] Add RTM stateless benchmark#55420
jerrypeng wants to merge 12 commits into
apache:masterfrom
jerrypeng:SPARK-56543

Conversation

@jerrypeng
Copy link
Copy Markdown
Contributor

@jerrypeng jerrypeng commented Apr 20, 2026

What changes were proposed in this pull request?

Adds RTMKafkaKafkaBenchmark, a standalone benchmark program for the Real-Time Mode (RTM) trigger in Structured Streaming. It is a stateless end-to-end Kafka-to-Kafka latency benchmark.

The benchmark is implemented as an object extending org.apache.spark.benchmark.BenchmarkBase, following the same convention as other Spark benchmarks (e.g.
StateStoreBasicOperationsBenchmark, MapStatusesSerDeserBenchmark). It is not a ScalaTest suite, so it is not discovered or executed by SBT test or Maven surefire — it only runs when
invoked explicitly via runMain or spark-submit.

The benchmark:

  1. Spins up a local-cluster Spark context (3 workers × 5 cores × 1024 MB heap, matching the 5-partition input topic) and a live embedded Kafka broker via KafkaTestUtils.
  2. Generates synthetic records at 1,000 records/sec (recordsPerSecond) into an input Kafka topic (5 partitions, numPartitions) from a background producer thread.
  3. Runs a stateless pipeline with RealTimeTrigger and a 5-minute trigger window (batchDuration): reads from Kafka → base64-encodes the value → stamps a source-timestamp header → writes to
    an output Kafka topic.
  4. Captures per-batch processing latency via Spark's observe() API (logged on StreamingQueryProgress, not written to the result file).
  5. After 4 batches complete (numBatches), reads back the output topic and reports e2e latency percentiles (p0, p50, p90, p95, p99, p100) over batches 2–4 — the first batch is dropped as
    warmup via numBatchesToFilter = 1 — by comparing the source-timestamp header to the Kafka sink timestamp.
  6. Owns its own teardown via try { ... } finally { cleanup() } inside runBenchmarkSuite, with an idempotent cleanup() that stops Spark and tears down the embedded Kafka broker even if
    setup partially fails, the streaming query times out, or post-run analysis throws.

All knobs above (and the Kafka/streaming latency tuning options) are declared as named private vals at the top of the object with a short rationale per value.

Sample benchmark results

 Kafka to kafka query e2e_latency in milliseconds is
  p0:   45
  p50:  70
  p90:  78
  p95:  81
  p99:  85
  p100: 331

Why are the changes needed?

There is currently no benchmark to measure RTM stateless Kafka-to-Kafka latency. This makes it hard to quantify regressions or improvements to the RTM code path during local development
or before merging changes. This benchmark provides a repeatable, self-contained way to measure that, and follows the existing Spark benchmark framework so result files can be committed
and diffed across runs.

Does this PR introduce any user-facing change?

no

How was this patch tested?

N/A. Only a benchmark was added.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.6 (claude-sonnet-4-6)

@jerrypeng
Copy link
Copy Markdown
Contributor Author

@viirya thank you for your review! I have addressed your comments. PTAL.

@jerrypeng
Copy link
Copy Markdown
Contributor Author

@viirya thank you for your review! I have addressed your comments. PTAL.

@jerrypeng jerrypeng requested a review from viirya April 28, 2026 05:09
@jerrypeng
Copy link
Copy Markdown
Contributor Author

@viirya I have address your comments PTAL. Thanks in advance!

Copy link
Copy Markdown
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The source-level run instructions are now consistent with Spark benchmark style: sql-kafka-0-10/Test/runMain ..., and output support via SPARK_GENERATE_BENCHMARK_FILES=1 is aligned with BenchmarkBase.

The PR description is stale: it still says RTMKafkaKafkaBenchmarkSuite and testOnly *RTMKafkaKafkaBenchmarkSuite. That should be updated to the new object name and Test/runMain command, but that is documentation cleanup rather than a code blocker.

private var spark: SparkSession = _
private var testUtils: KafkaTestUtils = _

override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BenchmarkBase.main calls runBenchmarkSuite(args) and only calls afterAll() afterwards; it does not wrap runBenchmarkSuite in try/finally. This benchmark starts embedded Kafka and a local-cluster Spark session in runBenchmarkSuite, then relies on afterAll() for teardown. If benchmark(...) times out, the query fails, getLatencies throws, or setup partially fails after Kafka starts, afterAll() will not run, leaving Kafka/Spark resources behind. Since this benchmark intentionally runs heavyweight local resources, it should handle its own exception path, e.g. wrap setup/run in try/finally or call an idempotent cleanup method on failure.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure will add. Though the resources will not really be leaked as 1) Kafka is run in the same process and 2) workers will shutdown themselves down when the driver is not reachable.

Copy link
Copy Markdown
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks almost good to merge except for one minor issue and PR description cleanup. After fixing that, we can merge this.

Copy link
Copy Markdown
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, you added a new benchmark but you don't add benchmark result file?

If no benchmark result file, how do we know if later PRs make improvement or regression?

Could you run it and add benchmark result too?

@jerrypeng
Copy link
Copy Markdown
Contributor Author

@viirya added.

* Example output from a recent run (Linux x86_64, OpenJDK 17):
* {{{
* Kafka to kafka query e2e_latency in milliseconds is
* p0: 45
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you take a look at the existing benchmark result files? We usually have benchmark environment in the result file. We should have it in this benchmark result too.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, where is it? Seems RTMKafkaKafkaBenchmark-results.txt is not updated?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya its here: connector/kafka-0-10-sql/benchmarks/RTMKafkaKafkaBenchmark-results.txt. It is part of the PR.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant you don't update the benchmark result with benchmark environment on it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can take a look at existing benchmark files like AvroWriteBenchmark-results.txt. It has recorded in what environment it is benchmaking, e.g.,

OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 6.14.0-1017-azure
AMD EPYC 9V74 80-Core Processor

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you also don't add it in the benchmark code. My original comment #55420 (comment) is suggesting to add the environment title into the benchmark results.

Copy link
Copy Markdown
Contributor Author

@jerrypeng jerrypeng May 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. I added the environment information

adding env info
.option("kafka.max.partition.fetch.bytes", "10485760") // 10MB
.load()

val currentTimestampUDF = udf(() => System.currentTimeMillis())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark's built-in current_timestamp() in a streaming context is evaluated once per batch for determinism — which is the exact opposite of what this benchmark wants (per-row wall-clock timestamp). This is a subtle correctness point: anyone seeing a UDF wrapping System.currentTimeMillis() will be tempted to "clean it up" to the built-in and silently change the semantics. Please add an inline comment, e.g.:

  // UDF instead of current_timestamp(): the built-in is evaluated once per batch
  // for streaming determinism, but we want per-row wall-clock to measure per-record latency.
  val currentTimestampUDF = udf(() => System.currentTimeMillis())

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add a comment

Comment on lines +141 to +150
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(10000)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(10000)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
.drop(col("timestamp"))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The observe(...) + drop chain is effectively dead.

The observed metrics are computed and then dropped — they don't go into the result file. They do surface in the Spark UI / log output via observe, but a reader can't tell that from the code. Either remove this section (if it's not needed) or add a comment stating that observed metrics are emitted to UI/log on purpose and are not part of the recorded result file.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally include this so that 1) to make sure using observe API does not cause latency issues for RTM and 2) to have the latency metrics in the streaming query progress event. I will add a comment


private def unix_millis(column: Column): Column = {
(column.cast("timestamp").cast("double") * 1000).cast("long")
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shadows the built-in SQL unix_millis function, which is confusing — a reader naturally assumes they should just use the built-in. Suggest:

  • Rename to toUnixMillis (or similar) to avoid the name collision.
  • Add a comment stating why a custom helper is needed (e.g. import conflict, or a specific cast-path requirement) and noting the precision implication of
    double * 1000 → long for timestamps with sub-millisecond precision.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will rename

Copy link
Copy Markdown
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of values are hardcoded in-line, and their choice materially affects the benchmark result:

benchmark(60.seconds.toMillis, 4)
genData(testUtils.brokerAddress, inputTopic, 1000)
.master("local-cluster[3, 5, 1024]")
val numBatchesToFilter = 2
.option("kafka.fetch.max.wait.ms", "10")
.option("kafka.max.partition.fetch.bytes", "10485760")
.option("kafka.buffer.memory", "67108864")
spark.conf.set(SQLConf.STREAMING_POLLING_DELAY.key, 10)

Suggest lifting these to private val at the top of the object with a short comment per value explaining why it was chosen. As-is, future contributors can't tell which knobs are load-bearing vs. arbitrary, and any of them could silently change the benchmark's meaning.

Copy link
Copy Markdown
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description doesn't match the implementation.

The PR body says the benchmark generates "synthetic records at 1,000 records/sec" and "runs N batches" — but the code hardcodes 1000 and 4. Please update either the description (to match the actual constants) or the implementation (to accept these via mainArgs).

topicName,
java.lang.Long.toString(i),
java.lang.Long.toString(System.currentTimeMillis())
),
Copy link
Copy Markdown
Member

@viirya viirya May 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: genData writes a producer-side timestamp into the record value but it's never read.

The benchmark uses the Kafka record-level timestamp (broker-assigned) as the source timestamp instead, so this value is effectively unused. Two options:

  • Switch to using this value as source-timestamp — it's a more accurate "when did the producer hand this off" measurement and is robust to scenarios where
    input and output topics might live on different brokers in future variants of this benchmark.
  • Or, if Kafka's record timestamp is the intended source-of-truth, drop the System.currentTimeMillis() write and just send a counter, so the code doesn't
    mislead readers.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The benchmark is not using wal append time (broker-assigned timestamp). The default for kafka is producer assigned timestamp. The timestamp field returned by the kafka source is the timestamp defined here.

.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("rtm-benchmark").toString)
Copy link
Copy Markdown
Member

@viirya viirya May 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: checkpoint directory isn't cleaned up.

The temp dir leaks across runs. Capture the path and Utils.deleteRecursively it in cleanup().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will address

*
* See `benchmarks/RTMKafkaKafkaBenchmark-results.txt` for a recorded run.
*/
object RTMKafkaKafkaBenchmark extends BenchmarkBase with Logging {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: justify "why not Benchmark.run()".

This is the only BenchmarkBase subclass in the repo that manually emits getJVMOSInfo() / getProcessorName() and doesn't use Benchmark.run() /
Benchmark.addCase(). The reason makes sense (you're measuring a latency distribution across a streaming pipeline, not "run this synchronous function N times"), but a one-line note in the class scaladoc would save future readers a head-scratch:

Unlike most Spark benchmarks, this one does not use Benchmark.run(): the metric of interest is end-to-end latency percentiles across a streaming pipeline, which doesn't fit the Best/Avg/Stdev table format. Environment header is emitted manually for consistency with other result files.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add comment

val minimumSourceTimestamp =
kafkaSinkData.agg(min("source-timestamp")).collect()(0)(0).asInstanceOf[Long]

val numBatchesToFilter = 2
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: default numBatches = 4 with numBatchesToFilter = 2 only leaves 2 effective batches

That puts the default very close to the failure threshold (filteredSink.count() == 0 → RuntimeException) under any kind of jitter. Suggest bumping the default numBatches to 8 or 10 so the common case has more headroom; the error message already tells users to increase it but it would be friendlier not to hit it by default.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two batches is significant of head room tbh. I would rather do this. Filter out 1 batch.

Comment on lines +176 to +181
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
if (event.progress.batchId == numBatches - 1) {
latch.countDown()
}
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: latch logic depends on batchId numbering.

This assumes batchId starts at 0 and increases monotonically by 1. True today for a fresh query, but a counter would be more robust and more obviously correct:

  private val batchesCompleted = new AtomicLong(0)
  override def onQueryProgress(event: ...): Unit = {
    if (batchesCompleted.incrementAndGet() >= numBatches) latch.countDown()
  }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

.withColumn("headers-map", map_from_entries(col("headers")))
.withColumn("source-timestamp",
col("headers-map.source-timestamp").cast("STRING").cast("BIGINT"))
.withColumn("sink-timestamp", unix_millis(col("timestamp")))
Copy link
Copy Markdown
Member

@viirya viirya May 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document the "single broker" assumption behind the latency formula.

  // source side (input topic read):
  unix_millis(col("timestamp"))  // ← input topic's broker-assigned timestamp, stored in header

  // sink side (output topic read):
  .withColumn("sink-timestamp", unix_millis(col("timestamp")))  // ← output topic's broker-assigned timestamp

source-timestamp and sink-timestamp come from two different Kafka read contexts — they're the record timestamps stamped by the broker on the input and output topics respectively. The sink - source formula only measures true latency because KafkaTestUtils runs a single embedded broker, so both timestamps share the same wall clock.

If someone later adapts this benchmark to a multi-broker setup (e.g. to measure cross-cluster shuffle latency), the formula will silently start including clock skew between brokers and there's nothing in the code to flag it. Worth a comment near the sink-timestamp definition, and/or switching to the producer-side System.currentTimeMillis() that's already being written into the record value — that one is clock-skew-immune by construction.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clock skew is fairly minimal in practice. For example within one AZ on aws, the clock skew is under 100 microseconds and between AZs it is typically under 1ms.

@jerrypeng
Copy link
Copy Markdown
Contributor Author

@viirya Thank you for your review! I have addressed your feedback. PTAL.

@jerrypeng jerrypeng requested a review from viirya May 19, 2026 06:02

// Checkpoint interval for the streaming query. 5-minute is recommended.
// Lowering it may cause more frequent checkpointing but can increase latency.
private val checkpointInterval = 5.minutes
Copy link
Copy Markdown
Member

@viirya viirya May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RealTimeTrigger(batchDurationMs: Long)'s parameter is the duration of each batch, not checkpoint internal. This variable name doesn't match the semantics of RealTimeTrigger.batchDurationMs. Suggest to rename it to batchDuration or realTimeBatchDuration and update the comment.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually it is a checkpoint interval since RTM executes batches for a fixed amount of time. I would like users reading this to also understand this.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I don't get it. This variable is used for RealTimeTrigger's batchDurationMs. Do you mean batchDurationMs is actually checkpoint interval? If so, why we have the current variable name as batchDurationMs?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I see what you mean: in RTM, batches run for a fixed amount of time, and the batch boundary is also where progress/checkpoint-like work is committed, so this duration effectively influences the checkpoint/progress cadence.

That said, since this value is passed directly to RealTimeTrigger(...), whose API names it batchDurationMs, I think calling the benchmark variable checkpointInterval is still a bit misleading. A reader may assume it is a checkpoint-specific configuration rather than the RTM batch duration.

Could we name it realTimeBatchDuration or batchDuration, and keep your point in the comment? For example:

// Duration of each RTM batch. Since RTM commits progress/checkpoints at batch
// boundaries, this also controls the effective checkpoint/progress cadence.
private val realTimeBatchDuration = 5.minutes

This keeps the variable aligned with the public API while still explaining the RTM checkpoint/progress behavior.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

@viirya
Copy link
Copy Markdown
Member

viirya commented May 20, 2026

A few comments.

…afka010/benchmark/RTMKafkaKafkaBenchmark.scala

Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
@jerrypeng
Copy link
Copy Markdown
Contributor Author

Thanks @viirya I have address the additional comments. PTAL.

@viirya viirya closed this in fa8c480 May 22, 2026
viirya pushed a commit that referenced this pull request May 22, 2026
### What changes were proposed in this pull request?

Adds RTMKafkaKafkaBenchmark, a standalone benchmark program for the Real-Time Mode (RTM) trigger in Structured Streaming. It is a stateless end-to-end Kafka-to-Kafka latency benchmark.

  The benchmark is implemented as an object extending org.apache.spark.benchmark.BenchmarkBase, following the same convention as other Spark benchmarks (e.g.
  StateStoreBasicOperationsBenchmark, MapStatusesSerDeserBenchmark). It is not a ScalaTest suite, so it is not discovered or executed by SBT test or Maven surefire — it only runs when
  invoked explicitly via runMain or spark-submit.

  The benchmark:

  1. Spins up a local-cluster Spark context (3 workers × 5 cores × 1024 MB heap, matching the 5-partition input topic) and a live embedded Kafka broker via KafkaTestUtils.
  2. Generates synthetic records at 1,000 records/sec (recordsPerSecond) into an input Kafka topic (5 partitions, numPartitions) from a background producer thread.
  3. Runs a stateless pipeline with RealTimeTrigger and a 5-minute trigger window (batchDuration): reads from Kafka → base64-encodes the value → stamps a source-timestamp header → writes to
   an output Kafka topic.
  4. Captures per-batch processing latency via Spark's observe() API (logged on StreamingQueryProgress, not written to the result file).
  5. After 4 batches complete (numBatches), reads back the output topic and reports e2e latency percentiles (p0, p50, p90, p95, p99, p100) over batches 2–4 — the first batch is dropped as
  warmup via numBatchesToFilter = 1 — by comparing the source-timestamp header to the Kafka sink timestamp.
  6. Owns its own teardown via try { ... } finally { cleanup() } inside runBenchmarkSuite, with an idempotent cleanup() that stops Spark and tears down the embedded Kafka broker even if
  setup partially fails, the streaming query times out, or post-run analysis throws.

  All knobs above (and the Kafka/streaming latency tuning options) are declared as named private vals at the top of the object with a short rationale per value.

Sample benchmark results

```
 Kafka to kafka query e2e_latency in milliseconds is
  p0:   45
  p50:  70
  p90:  78
  p95:  81
  p99:  85
  p100: 331
```

### Why are the changes needed?

There is currently no benchmark to measure RTM stateless Kafka-to-Kafka latency. This makes it hard to quantify regressions or improvements to the RTM code path during local development
  or before merging changes. This benchmark provides a repeatable, self-contained way to measure that, and follows the existing Spark benchmark framework so result files can be committed
  and diffed across runs.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A.  Only a benchmark was added.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.6 (claude-sonnet-4-6)

Closes #55420 from jerrypeng/SPARK-56543.

Authored-by: Boyang Jerry Peng <jerry.peng@databricks.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit fa8c480)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
@viirya
Copy link
Copy Markdown
Member

viirya commented May 22, 2026

Merged to master/branch-4.x.

Thanks @jerrypeng

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants