[SPARK-25746][SQL] Refactoring ExpressionEncoder to get rid of flat flag#22749
[SPARK-25746][SQL] Refactoring ExpressionEncoder to get rid of flat flag#22749viirya wants to merge 26 commits into
Conversation
d755e84 to
84f3ce0
Compare
|
Test build #97459 has finished for PR 22749 at commit
|
|
Test build #97460 has finished for PR 22749 at commit
|
|
Test build #97479 has finished for PR 22749 at commit
|
|
Test build #97480 has finished for PR 22749 at commit
|
|
retest this please. |
|
Test build #97485 has finished for PR 22749 at commit
|
| // We convert the not-serializable TypeTag into StructType and ClassTag. | ||
| val mirror = ScalaReflection.mirror | ||
| val tpe = typeTag[T].in(mirror).tpe | ||
| val tpe = ScalaReflection.localTypeOf[T] |
There was a problem hiding this comment.
why change it from typeTag[T].in(mirror).tpe?
There was a problem hiding this comment.
localTypeOf is actually doing the same thing. I think it is better to use ScalaReflection for such thing.
There was a problem hiding this comment.
localTypeOf has a dealias at the end.
There was a problem hiding this comment.
I think it should be fine, but let me revert this change first.
| * name/positional binding is preserved. | ||
| */ | ||
| def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = { | ||
| if (encoders.length > 22) { |
There was a problem hiding this comment.
can we do it in a separated PR with a test?
|
|
||
| val newSerializer = enc.serializer.map(_.transformUp { | ||
| val newSerializer = enc.objSerializer.transformUp { | ||
| case b: BoundReference if b == originalInputObject => newInputObject |
There was a problem hiding this comment.
Since there is only one distinct BoundReference, we can just write case b: BoundReference => newInputObject
| @@ -103,75 +88,61 @@ object ExpressionEncoder { | |||
| * name/positional binding is preserved. | |||
| */ | |||
| def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = { | |||
There was a problem hiding this comment.
cool, this method is simplified a lot with the new abstraction.
| AssertNotNull(r, Seq("top level Product or row object")) | ||
| } | ||
| nullSafeSerializer match { | ||
| case If(_, _, s: CreateNamedStruct) => s |
There was a problem hiding this comment.
let's also make sure the if condition is IsNull, which better explains why we strip it(it can't be null)
|
|
||
| if (flat) require(serializer.size == 1) | ||
| /** | ||
| * A set of expressions, one for each top-level field that can be used to |
| // The schema after converting `T` to a Spark SQL row. This schema is dependent on the given | ||
| // serialier. | ||
| val schema: StructType = StructType(serializer.map { s => | ||
| StructField(s.name, s.dataType, s.nullable) |
There was a problem hiding this comment.
can we call dataType before serializer is analyzed?
There was a problem hiding this comment.
nvm, serializer don't need analysis
|
I like this idea! waiting for tests pass |
|
Test build #97539 has finished for PR 22749 at commit
|
|
hmm, it still has conflict... |
|
Let me rebase again. |
|
Test build #97964 has finished for PR 22749 at commit
|
|
retest this please. |
|
|
||
| // The input object to `ExpressionEncoder` is located at first column of an row. | ||
| val inputObject = BoundReference(0, dataTypeFor(tpe), | ||
| nullable = !cls.isPrimitive) |
There was a problem hiding this comment.
we just check isPrimitive of the given cls, can we check tpe directly?
There was a problem hiding this comment.
Yes, we can check tpe.typeSymbol.asClass.isPrimitive instead.
There was a problem hiding this comment.
good, then we don't need cls as a parameter.
|
|
||
| /** Helper for extracting internal fields from a case class. */ | ||
| /** | ||
| * Returns an expression for serializing the value of an input expression into Spark SQL |
There was a problem hiding this comment.
do we really need to duplicate the doc in this private method?
There was a problem hiding this comment.
I did simplify a lot of it.
| val serializer = serializerFor(AssertNotNull(inputObject, Seq("top level row object")), schema) | ||
| val deserializer = deserializerFor(schema) | ||
| val serializer = serializerFor(inputObject, schema) | ||
| val deserializer = deserializerFor(GetColumnByOrdinal(0, serializer.dataType), schema) |
There was a problem hiding this comment.
in ScalaReflection, we create GetColumnByOrdinal in deserializeFor, shall we follow it here?
There was a problem hiding this comment.
Ah, we need to access serializer.dataType here. So if we want to create GetColumnByOrdinal in deserializeFor, we need to pass this data type too. What do you think?
There was a problem hiding this comment.
ah i see, then let's leave it.
| // side, in cases like outer-join. | ||
| val left = { | ||
| val combined = if (this.exprEnc.flat) { | ||
| val combined = if (!this.exprEnc.objSerializer.dataType.isInstanceOf[StructType]) { |
There was a problem hiding this comment.
shall we create a method in ExpressionEncoder for this check?
|
Test build #97967 has finished for PR 22749 at commit
|
| * A sequence of expressions, one for each top-level field that can be used to | ||
| * extract the values from a raw object into an [[InternalRow]]: | ||
| * 1. If `serializer` encodes a raw object to a struct, we directly use the `serializer`. | ||
| * 2. For other cases, we create a struct to wrap the `serializer`. |
There was a problem hiding this comment.
Let's make these 2 comments more precise
1. If `serializer` encodes a raw object to a struct, strip the outer if-IsNull and get the CreateNamedStruct
2. For other cases, wrap the single serializer with CreateNamedStruct
| assert(numberOfCheckedArguments(deserializerFor[(java.lang.Double, Int)]) == 1) | ||
| assert(numberOfCheckedArguments(deserializerFor[(java.lang.Integer, java.lang.Integer)]) == 0) | ||
| assert(numberOfCheckedArguments( | ||
| deserializerForType(ScalaReflection.localTypeOf[(Double, Double)])) == 2) |
There was a problem hiding this comment.
shall we create a deserializerFor method in this test suite to save some code diff?
|
LGTM except 2 minor comments |
|
Test build #97969 has finished for PR 22749 at commit
|
| test("SPARK-22442: Generate correct field names for special characters") { | ||
| val serializer = serializerFor[SpecialCharAsFieldData](BoundReference( | ||
| 0, ObjectType(classOf[SpecialCharAsFieldData]), nullable = false)) | ||
| val serializer = serializerForType(ScalaReflection.localTypeOf[SpecialCharAsFieldData]) |
There was a problem hiding this comment.
can we replace all the serializerForType with serializerFor in this suite?
There was a problem hiding this comment.
Do you mean to create a method serializerFor in this suite? Or replace serializerForType with ScalaReflection.serializerFor?
There was a problem hiding this comment.
like deserializerFor in this suite, let's also create a serializerFor
|
Test build #97971 has finished for PR 22749 at commit
|
|
Test build #97991 has finished for PR 22749 at commit
|
| } | ||
| nullSafeSerializer match { | ||
| case If(_: IsNull, _, s: CreateNamedStruct) => s | ||
| case s: CreateNamedStruct => s |
There was a problem hiding this comment.
oh, good catch! I think this is redundant pattern.
There was a problem hiding this comment.
this is minor, we can update it in another PR. We don't need to wait for another jenkins QA round.
|
thanks, merging to master! |
|
Thanks @cloud-fan |
## What changes were proposed in this pull request? This is inspired during implementing apache#21732. For now `ScalaReflection` needs to consider how `ExpressionEncoder` uses generated serializers and deserializers. And `ExpressionEncoder` has a weird `flat` flag. After discussion with cloud-fan, it seems to be better to refactor `ExpressionEncoder`. It should make SPARK-24762 easier to do. To summarize the proposed changes: 1. `serializerFor` and `deserializerFor` return expressions for serializing/deserializing an input expression for a given type. They are private and should not be called directly. 2. `serializerForType` and `deserializerForType` returns an expression for serializing/deserializing for an object of type T to/from Spark SQL representation. It assumes the input object/Spark SQL representation is located at ordinal 0 of a row. So in other words, `serializerForType` and `deserializerForType` return expressions for atomically serializing/deserializing JVM object to/from Spark SQL value. A serializer returned by `serializerForType` will serialize an object at `row(0)` to a corresponding Spark SQL representation, e.g. primitive type, array, map, struct. A deserializer returned by `deserializerForType` will deserialize an input field at `row(0)` to an object with given type. 3. The construction of `ExpressionEncoder` takes a pair of serializer and deserializer for type `T`. It uses them to create serializer and deserializer for T <-> row serialization. Now `ExpressionEncoder` dones't need to remember if serializer is flat or not. When we need to construct new `ExpressionEncoder` based on existing ones, we only need to change input location in the atomic serializer and deserializer. ## How was this patch tested? Existing tests. Closes apache#22749 from viirya/SPARK-24762-refactor. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request? a followup of apache#22749. When we construct the new serializer in `ExpressionEncoder.tuple`, we don't need to add `if(isnull ...)` check for each field. They are either simple expressions that can propagate null correctly(e.g. `GetStructField(GetColumnByOrdinal(0, schema), index)`), or complex expression that already have the isnull check. ## How was this patch tested? existing tests Closes apache#22898 from cloud-fan/minor. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
This is inspired during implementing #21732. For now
ScalaReflectionneeds to consider howExpressionEncoderuses generated serializers and deserializers. AndExpressionEncoderhas a weirdflatflag. After discussion with @cloud-fan, it seems to be better to refactorExpressionEncoder. It should make SPARK-24762 easier to do.To summarize the proposed changes:
serializerForanddeserializerForreturn expressions for serializing/deserializing an input expression for a given type. They are private and should not be called directly.serializerForTypeanddeserializerForTypereturns an expression for serializing/deserializing for an object of type T to/from Spark SQL representation. It assumes the input object/Spark SQL representation is located at ordinal 0 of a row.So in other words,
serializerForTypeanddeserializerForTypereturn expressions for atomically serializing/deserializing JVM object to/from Spark SQL value.A serializer returned by
serializerForTypewill serialize an object atrow(0)to a corresponding Spark SQL representation, e.g. primitive type, array, map, struct.A deserializer returned by
deserializerForTypewill deserialize an input field atrow(0)to an object with given type.ExpressionEncodertakes a pair of serializer and deserializer for typeT. It uses them to create serializer and deserializer for T <-> row serialization. NowExpressionEncoderdones't need to remember if serializer is flat or not. When we need to construct newExpressionEncoderbased on existing ones, we only need to change input location in the atomic serializer and deserializer.How was this patch tested?
Existing tests.