From 7029e891ba25a026a8daf2664180166ee387bba5 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 28 Oct 2016 11:07:25 +0800 Subject: [PATCH 1/5] [SPARK-18137][SQL]Fix RewriteDistinctAggregates UnresolvedException when the UDAF has a foldable TypeCheck --- .../optimizer/RewriteDistinctAggregates.scala | 33 +++++++++++++------ .../sql/hive/execution/HiveUDFSuite.scala | 10 ++++++ 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala index d6a39ecf53b86..fcb967f03a1ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala @@ -142,11 +142,19 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { // Setup unique distinct aggregate children. val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq.distinct - val distinctAggChildAttrMap = distinctAggChildren.map(expressionAttributePair) - val distinctAggChildAttrs = distinctAggChildAttrMap.map(_._2) + val distinctAggChildFoldable = distinctAggChildren.filter(_.foldable) + // 1.only unfoldable child should be expand + // 2.if foldable child mapped to AttributeRefference using expressionAttributePair, + // the udaf function(such as ApproximatePercentile) + // which has a foldable TypeCheck will failed,because AttributeRefference is unfoldable + val distinctAggChildUnFoldableAttrMap = distinctAggChildren + .filter(!_.foldable).map(expressionAttributePair) + + val distinctAggChildrenUnFoldableAttrs = distinctAggChildUnFoldableAttrMap.map(_._2) // Setup expand & aggregate operators for distinct aggregate expressions. - val distinctAggChildAttrLookup = distinctAggChildAttrMap.toMap + val distinctAggChildAttrLookup = (distinctAggChildUnFoldableAttrMap + ++ distinctAggChildFoldable.map(c => c -> c)).toMap val distinctAggOperatorMap = distinctAggGroups.toSeq.zipWithIndex.map { case ((group, expressions), i) => val id = Literal(i + 1) @@ -172,11 +180,15 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { // Setup expand for the 'regular' aggregate expressions. val regularAggExprs = aggExpressions.filter(!_.isDistinct) val regularAggChildren = regularAggExprs.flatMap(_.aggregateFunction.children).distinct - val regularAggChildAttrMap = regularAggChildren.map(expressionAttributePair) - + val regularAggChildFoldable = regularAggChildren.filter(_.foldable) + val regularAggChildUnFoldable = regularAggChildren.filter(!_.foldable) + val regularAggChildUnFoldableAttrMap = regularAggChildUnFoldable + .map(expressionAttributePair) + val regularAggChildUnFoldableAttrs = regularAggChildUnFoldableAttrMap.map(_._2) // Setup aggregates for 'regular' aggregate expressions. val regularGroupId = Literal(0) - val regularAggChildAttrLookup = regularAggChildAttrMap.toMap + val regularAggChildAttrLookup = (regularAggChildUnFoldableAttrMap + ++ regularAggChildFoldable.map(c => c -> c)).toMap val regularAggOperatorMap = regularAggExprs.map { e => // Perform the actual aggregation in the initial aggregate. val af = patchAggregateFunctionChildren(e.aggregateFunction)(regularAggChildAttrLookup) @@ -207,13 +219,13 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { Seq(a.groupingExpressions ++ distinctAggChildren.map(nullify) ++ Seq(regularGroupId) ++ - regularAggChildren) + regularAggChildUnFoldable) } else { Seq.empty[Seq[Expression]] } // Construct the distinct aggregate input projections. - val regularAggNulls = regularAggChildren.map(nullify) + val regularAggNulls = regularAggChildUnFoldable.map(nullify) val distinctAggProjections = distinctAggOperatorMap.map { case (projection, _) => a.groupingExpressions ++ @@ -224,12 +236,13 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { // Construct the expand operator. val expand = Expand( regularAggProjection ++ distinctAggProjections, - groupByAttrs ++ distinctAggChildAttrs ++ Seq(gid) ++ regularAggChildAttrMap.map(_._2), + groupByAttrs ++ distinctAggChildrenUnFoldableAttrs ++ Seq(gid) + ++ regularAggChildUnFoldableAttrs, a.child) // Construct the first aggregate operator. This de-duplicates the all the children of // distinct operators, and applies the regular aggregate operators. - val firstAggregateGroupBy = groupByAttrs ++ distinctAggChildAttrs :+ gid + val firstAggregateGroupBy = groupByAttrs ++ distinctAggChildrenUnFoldableAttrs :+ gid val firstAggregate = Aggregate( firstAggregateGroupBy, firstAggregateGroupBy ++ regularAggOperatorMap.map(_._2), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index f690035c845f7..85a3277bfc985 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -150,6 +150,16 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { } test("Generic UDAF aggregates") { + checkAnswer(sql("SELECT ceiling(percentile_approx(key, 0.99999))" + + ", count(distinct key),sum(distinct key) FROM src LIMIT 1"), + sql("SELECT max(key), count(distinct key),sum(distinct key) FROM src LIMIT 1") + .collect().toSeq) + + checkAnswer(sql("SELECT ceiling(percentile_approx(key, 0.09999 + 0.9))" + + ", count(distinct key),sum(distinct key),1 FROM src LIMIT 1"), + sql("SELECT max(key), count(distinct key),sum(distinct key), 1 FROM src LIMIT 1") + .collect().toSeq) + checkAnswer(sql("SELECT ceiling(percentile_approx(key, 0.99999D)) FROM src LIMIT 1"), sql("SELECT max(key) FROM src LIMIT 1").collect().toSeq) From 8a6dd8daf11f7a0c29b3afc04706ccddc390a1bf Mon Sep 17 00:00:00 2001 From: root Date: Mon, 7 Nov 2016 15:20:29 +0800 Subject: [PATCH 2/5] if distinct agg only has foldable children,it will expand the first child;if has unfoldable children,it will only expand the unfoldable children --- .../optimizer/RewriteDistinctAggregates.scala | 83 ++++++++++--------- .../sql/hive/execution/HiveUDFSuite.scala | 20 +++-- 2 files changed, 56 insertions(+), 47 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala index fcb967f03a1ee..2ac8dca126073 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, LogicalPl import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.types.IntegerType -/** +/* * This rule rewrites an aggregate query with distinct aggregations into an expanded double * aggregation in which the regular aggregation expressions and every distinct clause is aggregated * in a separate group. The results are then combined in a second aggregate. @@ -115,9 +115,19 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { } // Extract distinct aggregate expressions. - val distinctAggGroups = aggExpressions - .filter(_.isDistinct) - .groupBy(_.aggregateFunction.children.toSet) + val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy{ + e => + if (e.aggregateFunction.children.exists(!_.foldable)) { + // Only expand the unfoldable children + e.aggregateFunction.children.filter(!_.foldable).toSet + } else { + // If aggregateFunction's children are all foldable + // we must expand at least one of the children (here we take the first child), + // or If we don't, we will get the wrong result, for example: + // count(distinct 1) will be explained to count(1) after the rewrite function. + e.aggregateFunction.children.take(1).toSet + } + } // Check if the aggregates contains functions that do not support partial aggregation. val existsNonPartial = aggExpressions.exists(!_.aggregateFunction.supportsPartial) @@ -134,27 +144,19 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { // Functions used to modify aggregate functions and their inputs. def evalWithinGroup(id: Literal, e: Expression) = If(EqualTo(gid, id), e, nullify(e)) - def patchAggregateFunctionChildren( - af: AggregateFunction)( - attrs: Expression => Expression): AggregateFunction = { - af.withNewChildren(af.children.map(attrs)).asInstanceOf[AggregateFunction] + def patchAggregateFunctionChildren(af: AggregateFunction)( + attrs: Expression => Option[Expression]): AggregateFunction = { + val newChildren = af.children.map(c => attrs(c).getOrElse(c)) + af.withNewChildren(newChildren).asInstanceOf[AggregateFunction] } // Setup unique distinct aggregate children. val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq.distinct - val distinctAggChildFoldable = distinctAggChildren.filter(_.foldable) - // 1.only unfoldable child should be expand - // 2.if foldable child mapped to AttributeRefference using expressionAttributePair, - // the udaf function(such as ApproximatePercentile) - // which has a foldable TypeCheck will failed,because AttributeRefference is unfoldable - val distinctAggChildUnFoldableAttrMap = distinctAggChildren - .filter(!_.foldable).map(expressionAttributePair) - - val distinctAggChildrenUnFoldableAttrs = distinctAggChildUnFoldableAttrMap.map(_._2) + val distinctAggChildAttrMap = distinctAggChildren.map(expressionAttributePair) + val distinctAggChildAttrs = distinctAggChildAttrMap.map(_._2) // Setup expand & aggregate operators for distinct aggregate expressions. - val distinctAggChildAttrLookup = (distinctAggChildUnFoldableAttrMap - ++ distinctAggChildFoldable.map(c => c -> c)).toMap + val distinctAggChildAttrLookup = distinctAggChildAttrMap.toMap val distinctAggOperatorMap = distinctAggGroups.toSeq.zipWithIndex.map { case ((group, expressions), i) => val id = Literal(i + 1) @@ -169,7 +171,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { val operators = expressions.map { e => val af = e.aggregateFunction val naf = patchAggregateFunctionChildren(af) { x => - evalWithinGroup(id, distinctAggChildAttrLookup(x)) + distinctAggChildAttrLookup.get(x).map(evalWithinGroup(id, _)) } (e, e.copy(aggregateFunction = naf, isDistinct = false)) } @@ -178,20 +180,20 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { } // Setup expand for the 'regular' aggregate expressions. - val regularAggExprs = aggExpressions.filter(!_.isDistinct) - val regularAggChildren = regularAggExprs.flatMap(_.aggregateFunction.children).distinct - val regularAggChildFoldable = regularAggChildren.filter(_.foldable) - val regularAggChildUnFoldable = regularAggChildren.filter(!_.foldable) - val regularAggChildUnFoldableAttrMap = regularAggChildUnFoldable - .map(expressionAttributePair) - val regularAggChildUnFoldableAttrs = regularAggChildUnFoldableAttrMap.map(_._2) + // only expand unfoldable children + val regularAggExprs = aggExpressions + .filter(e => !e.isDistinct && e.children.exists(!_.foldable)) + val regularAggChildren = regularAggExprs + .flatMap(_.aggregateFunction.children.filter(!_.foldable)) + .distinct + val regularAggChildAttrMap = regularAggChildren.map(expressionAttributePair) + // Setup aggregates for 'regular' aggregate expressions. val regularGroupId = Literal(0) - val regularAggChildAttrLookup = (regularAggChildUnFoldableAttrMap - ++ regularAggChildFoldable.map(c => c -> c)).toMap + val regularAggChildAttrLookup = regularAggChildAttrMap.toMap val regularAggOperatorMap = regularAggExprs.map { e => // Perform the actual aggregation in the initial aggregate. - val af = patchAggregateFunctionChildren(e.aggregateFunction)(regularAggChildAttrLookup) + val af = patchAggregateFunctionChildren(e.aggregateFunction)(regularAggChildAttrLookup.get) val operator = Alias(e.copy(aggregateFunction = af), e.sql)() // Select the result of the first aggregate in the last aggregate. @@ -219,13 +221,13 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { Seq(a.groupingExpressions ++ distinctAggChildren.map(nullify) ++ Seq(regularGroupId) ++ - regularAggChildUnFoldable) + regularAggChildren) } else { Seq.empty[Seq[Expression]] } // Construct the distinct aggregate input projections. - val regularAggNulls = regularAggChildUnFoldable.map(nullify) + val regularAggNulls = regularAggChildren.map(nullify) val distinctAggProjections = distinctAggOperatorMap.map { case (projection, _) => a.groupingExpressions ++ @@ -236,13 +238,12 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { // Construct the expand operator. val expand = Expand( regularAggProjection ++ distinctAggProjections, - groupByAttrs ++ distinctAggChildrenUnFoldableAttrs ++ Seq(gid) - ++ regularAggChildUnFoldableAttrs, + groupByAttrs ++ distinctAggChildAttrs ++ Seq(gid) ++ regularAggChildAttrMap.map(_._2), a.child) // Construct the first aggregate operator. This de-duplicates the all the children of // distinct operators, and applies the regular aggregate operators. - val firstAggregateGroupBy = groupByAttrs ++ distinctAggChildrenUnFoldableAttrs :+ gid + val firstAggregateGroupBy = groupByAttrs ++ distinctAggChildAttrs :+ gid val firstAggregate = Aggregate( firstAggregateGroupBy, firstAggregateGroupBy ++ regularAggOperatorMap.map(_._2), @@ -250,8 +251,8 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { // Construct the second aggregate val transformations: Map[Expression, Expression] = - (distinctAggOperatorMap.flatMap(_._2) ++ - regularAggOperatorMap.map(e => (e._1, e._3))).toMap + (distinctAggOperatorMap.flatMap(_._2) ++ + regularAggOperatorMap.map(e => (e._1, e._3))).toMap val patchedAggExpressions = a.aggregateExpressions.map { e => e.transformDown { @@ -274,9 +275,9 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { private def nullify(e: Expression) = Literal.create(null, e.dataType) private def expressionAttributePair(e: Expression) = - // We are creating a new reference here instead of reusing the attribute in case of a - // NamedExpression. This is done to prevent collisions between distinct and regular aggregate - // children, in this case attribute reuse causes the input of the regular aggregate to bound to - // the (nulled out) input of the distinct aggregate. + // We are creating a new reference here instead of reusing the attribute in case of a + // NamedExpression. This is done to prevent collisions between distinct and regular aggregate + // children, in this case attribute reuse causes the input of the regular aggregate to bound to + // the (nulled out) input of the distinct aggregate. e -> AttributeReference(e.sql, e.dataType, nullable = true)() } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 85a3277bfc985..e25a97414d68a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -150,14 +150,22 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { } test("Generic UDAF aggregates") { - checkAnswer(sql("SELECT ceiling(percentile_approx(key, 0.99999))" + - ", count(distinct key),sum(distinct key) FROM src LIMIT 1"), - sql("SELECT max(key), count(distinct key),sum(distinct key) FROM src LIMIT 1") + checkAnswer(sql("SELECT percentile_approx(2, 0.99999), " + + "sum(distinct 1), count(distinct 1,2,3,4) FROM src LIMIT 1"), + sql("SELECT 2, 1, 1 FROM src LIMIT 1") + .collect().toSeq) + + checkAnswer(sql("SELECT ceiling(percentile_approx(distinct key, 0.99999))" + + ", count(distinct key), sum(distinct key), " + + "count(distinct 1), sum(distinct 1), sum(1) FROM src LIMIT 1"), + sql("SELECT max(key), count(distinct key), sum(distinct key)," + + " 1, 1, sum(1) FROM src LIMIT 1") .collect().toSeq) - checkAnswer(sql("SELECT ceiling(percentile_approx(key, 0.09999 + 0.9))" + - ", count(distinct key),sum(distinct key),1 FROM src LIMIT 1"), - sql("SELECT max(key), count(distinct key),sum(distinct key), 1 FROM src LIMIT 1") + checkAnswer(sql("SELECT ceiling(percentile_approx(distinct key, 0.9 + 0.09999))" + + ", count(distinct key), sum(distinct key), " + + "count(distinct 1), sum(distinct 1), sum(1) FROM src LIMIT 1"), + sql("SELECT max(key), count(distinct key), sum(distinct key), 1, 1, sum(1) FROM src LIMIT 1") .collect().toSeq) checkAnswer(sql("SELECT ceiling(percentile_approx(key, 0.99999D)) FROM src LIMIT 1"), From c5b3a3d2c81ebaf86ed08682c4d30de0120a2850 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 7 Nov 2016 17:49:01 +0800 Subject: [PATCH 3/5] fix some code style --- .../optimizer/RewriteDistinctAggregates.scala | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala index 2ac8dca126073..24f4e032ebfaf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, LogicalPl import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.types.IntegerType -/* +/** * This rule rewrites an aggregate query with distinct aggregations into an expanded double * aggregation in which the regular aggregation expressions and every distinct clause is aggregated * in a separate group. The results are then combined in a second aggregate. @@ -125,6 +125,8 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { // we must expand at least one of the children (here we take the first child), // or If we don't, we will get the wrong result, for example: // count(distinct 1) will be explained to count(1) after the rewrite function. + // Generally, the distinct aggregateFunction should not run + // foldable TypeCheck for the first child. e.aggregateFunction.children.take(1).toSet } } @@ -144,8 +146,9 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { // Functions used to modify aggregate functions and their inputs. def evalWithinGroup(id: Literal, e: Expression) = If(EqualTo(gid, id), e, nullify(e)) - def patchAggregateFunctionChildren(af: AggregateFunction)( - attrs: Expression => Option[Expression]): AggregateFunction = { + def patchAggregateFunctionChildren( + af: AggregateFunction)( + attrs: Expression => Option[Expression]): AggregateFunction = { val newChildren = af.children.map(c => attrs(c).getOrElse(c)) af.withNewChildren(newChildren).asInstanceOf[AggregateFunction] } @@ -251,8 +254,8 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { // Construct the second aggregate val transformations: Map[Expression, Expression] = - (distinctAggOperatorMap.flatMap(_._2) ++ - regularAggOperatorMap.map(e => (e._1, e._3))).toMap + (distinctAggOperatorMap.flatMap(_._2) ++ + regularAggOperatorMap.map(e => (e._1, e._3))).toMap val patchedAggExpressions = a.aggregateExpressions.map { e => e.transformDown { @@ -275,9 +278,9 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { private def nullify(e: Expression) = Literal.create(null, e.dataType) private def expressionAttributePair(e: Expression) = - // We are creating a new reference here instead of reusing the attribute in case of a - // NamedExpression. This is done to prevent collisions between distinct and regular aggregate - // children, in this case attribute reuse causes the input of the regular aggregate to bound to - // the (nulled out) input of the distinct aggregate. + // We are creating a new reference here instead of reusing the attribute in case of a + // NamedExpression. This is done to prevent collisions between distinct and regular aggregate + // children, in this case attribute reuse causes the input of the regular aggregate to bound to + // the (nulled out) input of the distinct aggregate. e -> AttributeReference(e.sql, e.dataType, nullable = true)() } From 67fc72dc1e816566cd23234e52592e09e48fbe2c Mon Sep 17 00:00:00 2001 From: root Date: Mon, 7 Nov 2016 22:21:54 +0800 Subject: [PATCH 4/5] fix some code style --- .../optimizer/RewriteDistinctAggregates.scala | 7 +-- .../sql/hive/execution/HiveUDFSuite.scala | 43 +++++++++++++------ 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala index 24f4e032ebfaf..5ca3fe981b224 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala @@ -115,11 +115,12 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { } // Extract distinct aggregate expressions. - val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy{ + val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy { e => - if (e.aggregateFunction.children.exists(!_.foldable)) { + val unfoldableChildren = e.aggregateFunction.children.filter(!_.foldable).toSet + if (unfoldableChildren.nonEmpty) { // Only expand the unfoldable children - e.aggregateFunction.children.filter(!_.foldable).toSet + unfoldableChildren } else { // If aggregateFunction's children are all foldable // we must expand at least one of the children (here we take the first child), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index e25a97414d68a..48adc833f4b22 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -150,21 +150,38 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { } test("Generic UDAF aggregates") { - checkAnswer(sql("SELECT percentile_approx(2, 0.99999), " + - "sum(distinct 1), count(distinct 1,2,3,4) FROM src LIMIT 1"), - sql("SELECT 2, 1, 1 FROM src LIMIT 1") - .collect().toSeq) - checkAnswer(sql("SELECT ceiling(percentile_approx(distinct key, 0.99999))" + - ", count(distinct key), sum(distinct key), " + - "count(distinct 1), sum(distinct 1), sum(1) FROM src LIMIT 1"), - sql("SELECT max(key), count(distinct key), sum(distinct key)," + - " 1, 1, sum(1) FROM src LIMIT 1") - .collect().toSeq) + checkAnswer(sql( + """ + |SELECT percentile_approx(2, 0.99999), + | sum(distinct 1), + | count(distinct 1,2,3,4) FROM src LIMIT 1 + """.stripMargin), sql("SELECT 2, 1, 1 FROM src LIMIT 1").collect().toSeq) + + checkAnswer(sql( + """ + |SELECT ceiling(percentile_approx(distinct key, 0.99999)), + | count(distinct key), + | sum(distinct key), + | count(distinct 1), + | sum(distinct 1), + | sum(1) FROM src LIMIT 1 + """.stripMargin), + sql( + """ + |SELECT max(key), + | count(distinct key), + | sum(distinct key), + | 1, 1, sum(1) FROM src LIMIT 1 + """.stripMargin).collect().toSeq) - checkAnswer(sql("SELECT ceiling(percentile_approx(distinct key, 0.9 + 0.09999))" + - ", count(distinct key), sum(distinct key), " + - "count(distinct 1), sum(distinct 1), sum(1) FROM src LIMIT 1"), + checkAnswer(sql( + """ + |SELECT ceiling(percentile_approx(distinct key, 0.9 + 0.09999)), + | count(distinct key), sum(distinct key), + | count(distinct 1), sum(distinct 1), + | sum(1) FROM src LIMIT 1 + """.stripMargin), sql("SELECT max(key), count(distinct key), sum(distinct key), 1, 1, sum(1) FROM src LIMIT 1") .collect().toSeq) From 6e58167153234f940f71dbe95be8efa0818e6287 Mon Sep 17 00:00:00 2001 From: root Date: Tue, 8 Nov 2016 14:17:39 +0800 Subject: [PATCH 5/5] fix a code style --- .../sql/catalyst/optimizer/RewriteDistinctAggregates.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala index 5ca3fe981b224..cd8912f793f89 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala @@ -115,8 +115,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { } // Extract distinct aggregate expressions. - val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy { - e => + val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy { e => val unfoldableChildren = e.aggregateFunction.children.filter(!_.foldable).toSet if (unfoldableChildren.nonEmpty) { // Only expand the unfoldable children