Skip to content

Conversation

@ericm-db
Copy link
Contributor

@ericm-db ericm-db commented Dec 6, 2025

What changes were proposed in this pull request?

This PR decouples the versioning of OffsetSeqMetadata from OffsetLog, allowing them to evolve independently while maintaining backward compatibility
with existing streaming checkpoints.

Key changes:

  1. Introduced OffsetSeqMetadataBase trait to abstract over metadata versions (V1 and V2)
  2. Added OffsetSeqMetadataV2 with new fields:
    - sourceMetadataInfo: Map of source metadata keyed by sourceId
    - controlBatchInfo: Information about control batches (e.g., rewind batches)
  3. Added OffsetSeqV2 class for VERSION_2 offset sequences
  4. Added STREAMING_OFFSET_LOG_FORMAT_VERSION config to control offset log versioning
  5. Updated OffsetSeqLog to support both VERSION_1 and VERSION_2 serialization formats
  6. Updated method signatures throughout the codebase to accept base types (OffsetSeqMetadataBase, OffsetSeqBase) instead of concrete types

Why are the changes needed?

Previously, OffsetSeqMetadata version was tightly coupled to the OffsetLog version. This meant that any change to metadata format would require
bumping the entire offset log version, making it difficult to evolve metadata independently.

This change enables:

  • Independent versioning of offset metadata without breaking checkpoint compatibility
  • Future extensibility for adding new metadata fields (e.g., source metadata, control batch information)
  • Better type safety through trait-based abstraction

Does this PR introduce any user-facing change?

No. The changes are backward compatible:

  • Existing checkpoints continue to work (VERSION_1 is the default)
  • The new VERSION_2 format is only used when explicitly configured via spark.sql.streaming.offsetLog.formatVersion=2

How was this patch tested?

  • Added unit tests for OffsetSeqMetadataV2 serialization/deserialization
  • Added tests for VERSION_2 offset log format with control batch information

Was this patch authored or co-authored using generative AI tooling?

No

@dongjoon-hyun dongjoon-hyun marked this pull request as draft December 6, 2025 23:39
@ericm-db ericm-db changed the title [WIP][SS] Adding versioning and SQLConf to control OffsetLog version [SPARK-54583][SS] Add SQLConf to enable use of OffsetMap Dec 6, 2025
@ericm-db ericm-db marked this pull request as ready for review December 6, 2025 23:45
Copy link
Contributor

@anishshri-db anishshri-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm pending green CI

Copy link
Contributor

@liviazhu liviazhu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you be more clear about the mappings between OffsetSeqMetadata and OffsetSeq/OffsetMap? Can any version of OffsetSeqMetadata be used with OffsetSeq? Can any version of metadata be used with OffsetMap? Looking at the PR description the answer is yet, but there's only 1 conf for both. How do we control the metadata version, do we need to introduce another conf?

object OffsetSeqLog {
private[streaming] val VERSION_1 = 1
private[streaming] val VERSION_2 = 2
val VERSION_1 = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we scope this?

val metadata = v1.metadataOpt.get.asInstanceOf[OffsetSeqMetadata]
v1.copy(metadataOpt = Some(metadata.copy(
conf = metadata.conf + (SQLConf.SHUFFLE_PARTITIONS.key -> numPartitions.toString))))
case _ => throw OfflineStateRepartitionErrors.unsupportedOffsetSeqVersionError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to create a TODO + add ticket to support OffsetMap here?

*/
case class OffsetSeqV2(
offsets: Seq[Option[OffsetV2]],
metadataOpt: Option[OffsetSeqMetadataV2] = None) extends OffsetSeqBase
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we require metadata for OffsetSeqV2 (don't make it an option)?

case class OffsetMap(
offsetsMap: Map[String, Option[OffsetV2]],
metadataOpt: Option[OffsetSeqMetadata] = None) extends OffsetSeqBase {
metadataOpt: Option[OffsetSeqMetadataBase] = None) extends OffsetSeqBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants