From f59a9d47a9b6a82f724af03089867cd9d262fc7e Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 26 Sep 2023 10:46:03 +0800 Subject: [PATCH 1/2] test --- .../scala/org/apache/spark/sql/Dataset.scala | 25 +++++-------------- 1 file changed, 6 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 0cc037b157e07..1900c5d46abf9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1441,7 +1441,7 @@ class Dataset[T] private[sql]( if (sqlContext.conf.supportQuotedRegexColumnName) { colRegex(colName) } else { - Column(addDataFrameIdToCol(resolve(colName))) + Column(addPlanId(UnresolvedAttribute.quotedString(colName))) } } @@ -1457,23 +1457,10 @@ class Dataset[T] private[sql]( def metadataColumn(colName: String): Column = Column(queryExecution.analyzed.getMetadataAttributeByName(colName)) - // Attach the dataset id and column position to the column reference, so that we can detect - // ambiguous self-join correctly. See the rule `DetectAmbiguousSelfJoin`. - // This must be called before we return a `Column` that contains `AttributeReference`. - // Note that, the metadata added here are only available in the analyzer, as the analyzer rule - // `DetectAmbiguousSelfJoin` will remove it. - private def addDataFrameIdToCol(expr: NamedExpression): NamedExpression = { - val newExpr = expr transform { - case a: AttributeReference - if sparkSession.conf.get(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED) => - val metadata = new MetadataBuilder() - .withMetadata(a.metadata) - .putLong(Dataset.DATASET_ID_KEY, id) - .putLong(Dataset.COL_POS_KEY, logicalPlan.output.indexWhere(a.semanticEquals)) - .build() - a.withMetadata(metadata) - } - newExpr.asInstanceOf[NamedExpression] + private def addPlanId(expr: Expression): Expression = { + // reuse existing DATASET_ID_KEY as the PLAN_ID + expr.setTagValue(LogicalPlan.PLAN_ID_TAG, id) + expr } /** @@ -1489,7 +1476,7 @@ class Dataset[T] private[sql]( case ParserUtils.qualifiedEscapedIdentifier(nameParts, columnNameRegex) => Column(UnresolvedRegex(columnNameRegex, Some(nameParts), caseSensitive)) case _ => - Column(addDataFrameIdToCol(resolve(colName))) + Column(addPlanId(UnresolvedAttribute.quotedString(colName))) } } From 9d67eb52ea39c3101611c04c137b8170203ae83c Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Mon, 9 Oct 2023 10:12:20 +0800 Subject: [PATCH 2/2] address comments --- .../scala/org/apache/spark/sql/Dataset.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 1900c5d46abf9..d747e1534e3ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1441,7 +1441,7 @@ class Dataset[T] private[sql]( if (sqlContext.conf.supportQuotedRegexColumnName) { colRegex(colName) } else { - Column(addPlanId(UnresolvedAttribute.quotedString(colName))) + createColumnWithPlanId(colName) } } @@ -1457,12 +1457,6 @@ class Dataset[T] private[sql]( def metadataColumn(colName: String): Column = Column(queryExecution.analyzed.getMetadataAttributeByName(colName)) - private def addPlanId(expr: Expression): Expression = { - // reuse existing DATASET_ID_KEY as the PLAN_ID - expr.setTagValue(LogicalPlan.PLAN_ID_TAG, id) - expr - } - /** * Selects column based on the column name specified as a regex and returns it as [[Column]]. * @group untypedrel @@ -1476,10 +1470,20 @@ class Dataset[T] private[sql]( case ParserUtils.qualifiedEscapedIdentifier(nameParts, columnNameRegex) => Column(UnresolvedRegex(columnNameRegex, Some(nameParts), caseSensitive)) case _ => - Column(addPlanId(UnresolvedAttribute.quotedString(colName))) + createColumnWithPlanId(colName) } } + private def createColumnWithPlanId(colName: String) = { + val expr = resolve(colName) match { + case attr: AttributeReference => UnresolvedAttribute(Seq(attr.name)) + case _ => UnresolvedAttribute.quotedString(colName) + } + // reuse existing DATASET_ID_KEY as the PLAN_ID + expr.setTagValue(LogicalPlan.PLAN_ID_TAG, id) + Column(expr) + } + /** * Returns a new Dataset with an alias set. *