[SPARK-53332][SS] Enable StateDataSource with state checkpoint v2 (only snapshotStartBatchId option)#52202
[SPARK-53332][SS] Enable StateDataSource with state checkpoint v2 (only snapshotStartBatchId option)#52202dylanwong250 wants to merge 15 commits into
Conversation
| /** Helper class for [[RocksDBCheckpointMetadata]] */ | ||
| object RocksDBCheckpointMetadata { | ||
| val VERSION = SQLConf.get.stateStoreCheckpointFormatVersion | ||
| val VERSION = 1 |
There was a problem hiding this comment.
I want to point out this change. I believe using val VERSION = SQLConf.get.stateStoreCheckpointFormatVersion is incorrect in this case and I was seeing a few test failures. The QLConf.get.stateStoreCheckpointFormatVersion refers to the checkpoint format version not the metadata format version. Additionally, since this is inside an object as a val it will be instantiated once and is not the current value of SQLConf.get.stateStoreCheckpointFormatVersion. I think this VERSION may be shared incorrectly also.
I also experimented with having VERSION equal to the current value. I ran into a few issues with the maintenance threads having a different version than the streaming query they were ran in.
| .replayStateFromSnapshot( | ||
| opts.snapshotVersion, | ||
| opts.endVersion, | ||
| readOnly = false, |
There was a problem hiding this comment.
why is readOnly false for state reader?
There was a problem hiding this comment.
I initially had it to readOnly = false to match what the replayStateFromSnapshot default value was without the uniqueIds. Also looking at the previous PR for snapshot reading #46944 it seems the readOnly option did not exist at the time. I changed to readOnly = true and all the tests pass so I think it is better to switch to readOnly = true.
| } | ||
|
|
||
| /** Snapshot options specialized for a single state store handler. */ | ||
| case class HandlerSnapshotOptions( |
There was a problem hiding this comment.
Made private[join]
| startKeyWithIndexToValueStateStoreCkptId: Option[String] = None, | ||
| endKeyToNumValuesStateStoreCkptId: Option[String] = None, | ||
| endKeyWithIndexToValueStateStoreCkptId: Option[String] = None) { | ||
| def getKeyToNumValuesHandlerOpts(): HandlerSnapshotOptions = |
There was a problem hiding this comment.
Fixed. I used https://github.com/databricks/scala-style-guide?tab=readme-ov-file#spacing-and-indentation for reference.
| private val keyToNumValuesStateStoreCkptId = if (joinSide == LeftSide) { | ||
| private val endStateStoreCheckpointIds = | ||
| SymmetricHashJoinStateManager.getStateStoreCheckpointIds( | ||
| partition.partition, |
There was a problem hiding this comment.
nit: indent for the args ?
| protected class KeyWithIndexToValueStore(stateFormatVersion: Int) | ||
| extends StateStoreHandler(KeyWithIndexToValueType, keyWithIndexToValueStateStoreCkptId) { | ||
| protected class KeyWithIndexToValueStore( | ||
| stateFormatVersion: Int, |
There was a problem hiding this comment.
nit: this should be 4 spaces here ?
…ly snapshotStartBatchId option)
### What changes were proposed in this pull request?
This PR enables StateDataSource support with state checkpoint v2 format for the `snapshotStartBatchId` and related options, completing the StateDataSource checkpoint v2 integration.
There is changes to the replayStateFromSnapshot method signature. `snapshotVersionStateStoreCkptId` and `endVersionStateStoreCkptId`. Both are needed as `snapshotVersionStateStoreCkptId` is used when getting the snapshot and `endVersionStateStoreCkptId` for calculating the full lineage from the final version.
Before
```
def replayStateFromSnapshot(
snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false): StateStore
```
After
```
def replayStateFromSnapshot(
snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false): StateStore
snapshotVersion: Long,
endVersion: Long,
readOnly: Boolean = false,
snapshotVersionStateStoreCkptId: Option[String] = None,
endVersionStateStoreCkptId: Option[String] = None): StateStore
```
This is the final PR in the series following:
- apache#52047: Enable StateDataSource with state checkpoint v2 (only batchId option)
- apache#52148: Enable StateDataSource with state checkpoint v2 (only readChangeFeed)
NOTE: To read checkpoint v2 state data sources it is required to have `"spark.sql.streaming.stateStore.checkpointFormatVersion" -> 2`. It is possible to allow reading state data sources arbitrarily based on what is in the CommitLog by relaxing assertion checks but this is left as a future change.
### Why are the changes needed?
State checkpoint v2 (`"spark.sql.streaming.stateStore.checkpointFormatVersion"`) introduces a new format for storing state metadata that includes unique identifiers in the file path for each state store. The existing StateDataSource implementation only worked with checkpoint v1 format, making it incompatible with streaming queries using the newer checkpoint format. Only `batchId` was implemented in apache#52047 and only `readChangeFeed` was implemented in apache#52148.
### Does this PR introduce _any_ user-facing change?
Yes.
State Data Source will work when checkpoint v2 is used and the `snapshotStartBatchId` and related options are used.
### How was this patch tested?
In the previous PRs test suites were added to parameterize the current tests with checkpoint v2. All of these tests are now added back. All tests that previously intentionally tested some feature of the State Data Source Reader with checkpoint v1 should now be parameterized with checkpoint v2 (including python tests).
`RocksDBWithCheckpointV2StateDataSourceReaderSnapshotSuite` is added which uses the golden file approach similar to apache#46944 where `snapshotStartBatchId` is first added.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes apache#52202 from dylanwong250/SPARK-53332.
Authored-by: Dylan Wong <dylan.wong@databricks.com>
Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
What changes were proposed in this pull request?
This PR enables StateDataSource support with state checkpoint v2 format for the
snapshotStartBatchIdand related options, completing the StateDataSource checkpoint v2 integration.There is changes to the replayStateFromSnapshot method signature.
snapshotVersionStateStoreCkptIdandendVersionStateStoreCkptId. Both are needed assnapshotVersionStateStoreCkptIdis used when getting the snapshot andendVersionStateStoreCkptIdfor calculating the full lineage from the final version.Before
After
This is the final PR in the series following:
NOTE: To read checkpoint v2 state data sources it is required to have
"spark.sql.streaming.stateStore.checkpointFormatVersion" -> 2. It is possible to allow reading state data sources arbitrarily based on what is in the CommitLog by relaxing assertion checks but this is left as a future change.Why are the changes needed?
State checkpoint v2 (
"spark.sql.streaming.stateStore.checkpointFormatVersion") introduces a new format for storing state metadata that includes unique identifiers in the file path for each state store. The existing StateDataSource implementation only worked with checkpoint v1 format, making it incompatible with streaming queries using the newer checkpoint format. OnlybatchIdwas implemented in #52047 and onlyreadChangeFeedwas implemented in #52148.Does this PR introduce any user-facing change?
Yes.
State Data Source will work when checkpoint v2 is used and the
snapshotStartBatchIdand related options are used.How was this patch tested?
In the previous PRs test suites were added to parameterize the current tests with checkpoint v2. All of these tests are now added back. All tests that previously intentionally tested some feature of the State Data Source Reader with checkpoint v1 should now be parameterized with checkpoint v2 (including python tests).
RocksDBWithCheckpointV2StateDataSourceReaderSnapshotSuiteis added which uses the golden file approach similar to #46944 wheresnapshotStartBatchIdis first added.Was this patch authored or co-authored using generative AI tooling?
No