Skip to content

Introducing OperatorStateMetadataV2 for the TransformWithState operator#6

Closed
ericm-db wants to merge 22 commits into
tws-state-metadata-changesfrom
tws-operator-state-metadata
Closed

Introducing OperatorStateMetadataV2 for the TransformWithState operator#6
ericm-db wants to merge 22 commits into
tws-state-metadata-changesfrom
tws-operator-state-metadata

Conversation

@ericm-db
Copy link
Copy Markdown
Owner

@ericm-db ericm-db commented May 20, 2024

What changes were proposed in this pull request?

Introducing the OperatorStateMetadataV2 class for the TransformWithState operator. This class, in addition to storing general operator information, also stores a map of arbitrary operator properties. We use this map to capture column family information that is not available from the driver, and is only available on the executors. Executors relay this information back to the driver via an Accumulator.

In this change, I simply want to deal with writing the OperatorStateMetadata info out to the file. We will deal with eviction, deduplication, and failure scenarios in future PRs.

Why are the changes needed?

These changes are needed as a part of the State Reader integration with the TransformWithState operator.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Via unit test. We are verifying that the information from operatorProperties that we collect from the executor are propagated correctly to the driver, and that any information is written appropriately.

Was this patch authored or co-authored using generative AI tooling?

No

@ericm-db ericm-db changed the title TransformWithState OperatorStateMetadata changes Introducing OperatorStateMetadataV2 for the TransformWithState operator May 20, 2024
@anishshri-db anishshri-db self-requested a review May 21, 2024 20:05
)
require(operatorStateMetadata.version == 1 || operatorStateMetadata.version == 2)
operatorStateMetadata match {
case v1: OperatorStateMetadataV1 =>
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could refactor this ?

currentTriggerStartOffsets != null && currentTriggerEndOffsets != null &&
currentTriggerLatestOffsets != null
)
if (lastExecution.isFirstBatch) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we do this for each new runId ?

@ericm-db ericm-db closed this May 29, 2024
@ericm-db ericm-db reopened this Jun 3, 2024
@ericm-db ericm-db closed this Jul 23, 2024
ericm-db pushed a commit that referenced this pull request Jul 26, 2024
…to the `hive-thriftserver` module to fix the Maven daily test

### What changes were proposed in this pull request?
This pr add bouncycastle-related test dependencies to the `hive-thrift` module to fix the Maven daily test.

### Why are the changes needed?
`sql-on-files.sql` added the following statement in apache#47480, which caused the Maven daily test to fail

https://github.com/apache/spark/blob/2363aec0c14ead24ade2bfa23478a4914f179c00/sql/core/src/test/resources/sql-tests/inputs/sql-on-files.sql#L10

- https://github.com/apache/spark/actions/runs/10094638521/job/27943309504
- https://github.com/apache/spark/actions/runs/10095571472/job/27943298802

```
- sql-on-files.sql *** FAILED ***
  "" did not contain "Exception" Exception did not match for query #6
  CREATE TABLE sql_on_files.test_orc USING ORC AS SELECT 1, expected: , but got: java.sql.SQLException
  org.apache.hive.service.cli.HiveSQLException: Error running query: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8542.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8542.0 (TID 8594) (localhost executor driver): java.lang.NoClassDefFoundError: org/bouncycastle/jce/provider/BouncyCastleProvider
    at test.org.apache.spark.sql.execution.datasources.orc.FakeKeyProvider$Factory.createProvider(FakeKeyProvider.java:127)
    at org.apache.hadoop.crypto.key.KeyProviderFactory.get(KeyProviderFactory.java:96)
    at org.apache.hadoop.crypto.key.KeyProviderFactory.getProviders(KeyProviderFactory.java:68)
    at org.apache.orc.impl.HadoopShimsCurrent.createKeyProvider(HadoopShimsCurrent.java:97)
    at org.apache.orc.impl.HadoopShimsCurrent.getHadoopKeyProvider(HadoopShimsCurrent.java:131)
    at org.apache.orc.impl.CryptoUtils$HadoopKeyProviderFactory.create(CryptoUtils.java:158)
    at org.apache.orc.impl.CryptoUtils.getKeyProvider(CryptoUtils.java:141)
    at org.apache.orc.impl.WriterImpl.setupEncryption(WriterImpl.java:1015)
    at org.apache.orc.impl.WriterImpl.<init>(WriterImpl.java:164)
    at org.apache.orc.OrcFile.createWriter(OrcFile.java:1078)
    at org.apache.spark.sql.execution.datasources.orc.OrcOutputWriter.<init>(OrcOutputWriter.scala:49)
    at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$$anon$1.newInstance(OrcFileFormat.scala:89)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:180)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:165)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:391)
    at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:107)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
    at org.apache.spark.scheduler.Task.run(Task.scala:146)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:647)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
  Caused by: java.lang.ClassNotFoundException: org.bouncycastle.jce.provider.BouncyCastleProvider
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
    ... 32 more
```

Because we have configured `hadoop.security.key.provider.path` as `test:///` in the parent `pom.xml`,

https://github.com/apache/spark/blob/5ccf9ba958f492c1eb4dde22a647ba75aba63d8e/pom.xml#L3165-L3166

`KeyProviderFactory#getProviders` will use `FakeKeyProvider$Factory` to create instances of `FakeKeyProvider`.

https://github.com/apache/spark/blob/5ccf9ba958f492c1eb4dde22a647ba75aba63d8e/sql/core/src/test/resources/META-INF/services/org.apache.hadoop.crypto.key.KeyProviderFactory#L18

During the initialization of `FakeKeyProvider`, it first initializes its superclass `org.apache.hadoop.crypto.key.KeyProvider`, which leads to the loading of the `BouncyCastleProvider` class. Therefore, we need to add bouncycastle-related test dependencies in the `hive-thrift` module.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manual Test with this pr.

```
build/mvn -Phive -Phive-thriftserver clean install -DskipTests
build/mvn -Phive -Phive-thriftserver clean install -Dtest=none -DwildcardSuites=org.apache.spark.sql.hive.thriftserver.ThriftServerQueryTestSuite -pl sql/hive-thriftserver
```

```
Run completed in 6 minutes, 52 seconds.
Total number of tests run: 243
Suites: completed 2, aborted 0
Tests: succeeded 243, failed 0, canceled 0, ignored 20, pending 0
All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47496 from LuciferYang/thrift-bouncycastle.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
ericm-db pushed a commit that referenced this pull request Jan 31, 2025
…anRelationPushDown

### What changes were proposed in this pull request?

Add the timezone information to a cast expression when the destination type requires it.

### Why are the changes needed?

When current_timestamp() is materialized as a string, the timezone information is gone (e.g., 2024-12-27 10:26:27.684158) which prohibits further optimization rules from being applied to the affected data source.

For example,

```
Project [1735900357973433#10 AS current_timestamp()#6]
+- 'Project [cast(2025-01-03 10:32:37.973433#11 as timestamp) AS 1735900357973433#10]
   +- RelationV2[2025-01-03 10:32:37.973433#11] xxx
```

-> This query fails to execute because the injected cast expression lacks the timezone information.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#49549 from changgyoopark-db/SPARK-50870.

Authored-by: changgyoopark-db <changgyoo.park@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
ericm-db pushed a commit that referenced this pull request May 5, 2026
### What changes were proposed in this pull request?

Address the open follow-ups from [SPARK-56681](https://issues.apache.org/jira/browse/SPARK-56681) (umbrella for PATH / SPARK-56605 cleanup) in a single cleanup PR. Items #1 and #2 were already wired by SPARK-56639; this PR covers the remainder.

| # | Item | Resolution |
|---|---|---|
| #1 | `FunctionResolution.resolveProcedure` was dead code | Already wired by SPARK-56639 (no action). |
| #2 | Frozen view / SQL-function PATH wiring unfinished | Already done by SPARK-56639 (no action). |
| #3 | `AnalysisContext.resolutionPathEntries` threadlocal | Audit only: confirmed `withNewAnalysisContext` / `reset()` correctly clear it. Full removal needs a coordinated refactor to plumb the path through `RelationResolution` / `FunctionResolution` method calls; flagged as a follow-up. |
| #4 | `Analyzer.executeAndCheck` clobbers outer `SQLConf.withExistingConf` | Extracted `runWithSessionConf` helper, added `SQLConf.getExistingConfIfSet`. `executeAndCheck` and `executeSameContext` now share one path that yields to any outer scope. |
| #5 | `VariableResolution.allowUnqualifiedSessionTempVariableLookup` force-loads default catalog | Replaced the hot-path catalog read with `CatalogManager.isSystemSessionOnPath`, which inspects stored session-path entries directly. No catalog load on column resolution. |
| #6 | `DROP VARIABLE` PATH gate asymmetric with `DECLARE` / `CREATE` | Removed the gate. DDL on session variables (`DECLARE` / `CREATE` / `DROP`) always targets `system.session` directly; only DML (`SET VAR`, `SELECT x`) goes through PATH. |
| #7 | `lookupFunctionType` exception swallow too broad | Narrowed from `NonFatal` to the explicit not-found list (`NoSuchFunctionException`, `NoSuchNamespaceException`, `CatalogNotFoundException`, `FORBIDDEN_OPERATION`). Other exceptions propagate. |
| #8 | `lookupFunctionType` fan-out had wasteful `system.*` candidates | Filtered them out — `system.session`, `system.builtin`, `system.ai` are already resolved earlier in the same method. |
| #9 | Three near-duplicate path-resolution helpers | Lifted into `CatalogManager.resolutionPathEntriesForAnalysis(pinnedEntries, viewCatalogAndNamespace)`. Relation, routine, and procedure resolution all route through it. |
| #10 | Tests for the new error paths and gates | Added a DECLARE / SET VAR / DROP cycle test under non-default PATH and a struct-variable field-vs-qualified ambiguity test in `sql-session-variables.sql`. |
| #11 | `ProtoToParsedPlanTestSuite.analyzerIsolationConf` was a bare `SQLConf` | Clone `spark.sessionState.conf` and only override `PATH_ENABLED=false`, so all `sparkConf` overrides (ANSI, alias config, ...) propagate automatically. |
| Bonus | `ResolveSetVariable` hardcoded `SYSTEM.SESSION` regardless of actual PATH | `unresolvedVariableError` now takes `Seq[Seq[String]]` path entries with **required** `Origin` (no overloads). DML lookup failures (`SET VAR`, `FETCH ... INTO`) report the full SQL path as a bracketed list, byte-for-byte consistent with `UNRESOLVED_ROUTINE` and `TABLE_OR_VIEW_NOT_FOUND`. DDL name validation in `ResolveCatalogs` continues to report `[system.session]` since PATH does not apply there. Origin is plumbed through `VariableManager.set` so all error sites carry a `queryContext` pointing at the offending variable identifier (parser opt-ins via `withOrigin(identifierReference)` so the highlight is the variable name, not the whole statement). |

### Why are the changes needed?

These are the cleanup items called out on SPARK-56681 from the post-merge source review of SPARK-56605. They eliminate dead code paths, plug user-visible bugs (force-loading a misconfigured default catalog on column resolution; clobbering pinned session configs; swallowing real catalog errors as `UNRESOLVED_ROUTINE`), remove the asymmetry between DDL and DML on session variables, and make `UNRESOLVED_VARIABLE` self-consistent with the other "not found" errors.

### Does this PR introduce _any_ user-facing change?

Yes.

- **`UNRESOLVED_VARIABLE.searchPath`** is now rendered as a bracketed list. For DML lookups (`SET VAR`, `FETCH ... INTO`), the list reflects the actual SQL PATH that was consulted instead of a hardcoded `SYSTEM.SESSION`. For DDL name validation (`DECLARE` / `DROP` with a non-session namespace), the list is `[`` `system`.`session` ``]` since PATH does not apply.
- **`UNRESOLVED_VARIABLE`** now always carries a `queryContext` that highlights just the offending variable identifier (e.g. `"builtin.var1"`, `"ses.var1"`), not the whole `DECLARE` / `SET VAR` statement.
- **`DROP TEMPORARY VARIABLE`** no longer raises `UNRESOLVED_VARIABLE` when the SQL PATH does not contain `system.session`. DDL on session variables ignores PATH, matching the existing behaviour of `DECLARE OR REPLACE VARIABLE`.
- **`lookupFunctionType`** no longer swallows non–`NotFound` errors. A catalog reporting `PERMISSION_DENIED` (or similar) for a function lookup now propagates instead of silently producing `UNRESOLVED_ROUTINE`.

### How was this patch tested?

- Added `sql-session-variables.sql` regression test for the struct-variable field-vs-qualified ambiguity (`DECLARE VARIABLE session STRUCT<a INT>` → `SELECT session.a` succeeds → `DROP` → `SELECT session.a` falls through to `UNRESOLVED_COLUMN`).
- Updated `SetPathSuite`: DECLARE / SET VAR / DROP cycle under a non-default PATH; bonus test asserts the actual rendered search path and the variable-identifier `queryContext`.
- Updated `SqlScriptingExecutionSuite` for the new bracketed `searchPath` and identifier-pinned `queryContext`.
- Regenerated `sql-session-variables.sql.out` for the new error shape.
- Added `resolutionPathEntriesForAnalysis` stubs to mocked `CatalogManager` instances in `PlanResolutionSuite`, `AlignAssignmentsSuiteBase`, and `TableLookupCacheSuite`.
- Ran focused suites locally; all pass:
  - `build/sbt 'sql/testOnly *SetPathSuite *SqlScriptingExecutionSuite *ExecuteImmediateEndToEndSuite'`
  - `build/sbt 'sql/testOnly *SimpleSQLViewSuite *SQLFunctionSuite'`
  - `build/sbt 'sql/testOnly *PlanResolutionSuite *UpdateTableAlignAssignmentsSuite *MergeIntoTableAlignAssignmentsSuite'`
  - `build/sbt 'catalyst/testOnly *TableLookupCacheSuite *AnalysisSuite *AnalysisErrorSuite *LookupFunctionsSuite'`
  - `build/sbt 'sql/testOnly *FunctionQualificationSuite *RelationQualificationSuite *DataSourceV2FunctionSuite'`
  - `build/sbt 'sql/testOnly *SQLQuerySuite'`
  - `build/sbt 'connect/testOnly *ProtoToParsedPlanTestSuite'`
  - `build/sbt 'sql/testOnly *SQLQueryTestSuite -- -z sql-session-variables.sql'`
  - Full `org.apache.spark.sql.catalyst.analysis.*`, `org.apache.spark.sql.catalyst.parser.*`, and `org.apache.spark.sql.analysis.resolver.*` suites.
- `scalastyle` and `scalafmt` clean across catalyst, sql, and connect modules.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Cursor Claude Opus 4.7

Closes apache#55647 from srielau/SPARK-56681-patch-clean-up.

Authored-by: Serge Rielau <serge@rielau.com>
Signed-off-by: Daniel Tenedorio <daniel.tenedorio@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants