From 3011af5d00021219ae7145f9827667097e145a88 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Fri, 9 Sep 2016 19:59:24 -0700 Subject: [PATCH 1/4] Add Hashing capability equivalent to Hive --- .../org/apache/spark/sql/hive/HiveHash.scala | 145 ++++++++++++++++++ .../apache/spark/sql/hive/HiveHashSuite.scala | 104 +++++++++++++ 2 files changed, 249 insertions(+) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveHash.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveHashSuite.scala diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveHash.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveHash.scala new file mode 100644 index 0000000000000..3230c868e7bff --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveHash.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.types.UTF8String + +/** + * Simulates Hive's hashing function at + * org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#hashcode() in Hive + * + * We should use this hash function for both shuffle and bucket of Hive tables, so that + * we can guarantee shuffle and bucketing have same data distribution + * + * TODO: Support Decimal and date related types + */ +@ExpressionDescription( + usage = "_FUNC_(a1, a2, ...) - Returns a hash value of the arguments.") +case class HiveHash(children: Seq[Expression], seed: Int) extends HashExpression[Int] { + def this(arguments: Seq[Expression]) = this(arguments, 42) + + override def dataType: DataType = IntegerType + + override def prettyName: String = "hive-hash" + + override protected def hasherClassName: String = classOf[HiveHash].getName + + override protected def computeHash(value: Any, dataType: DataType, seed: Int): Int = { + HiveHashFunction.hash(value, dataType, seed).toInt + } +} + +object HiveHashFunction extends InterpretedHashFunction { + override protected def hashInt(i: Int, seed: Long): Long = { + HiveHasher.hashInt(i, seed.toInt) + } + + override protected def hashLong(l: Long, seed: Long): Long = { + HiveHasher.hashLong(l, seed.toInt) + } + + override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = { + HiveHasher.hashUnsafeBytes(base, offset, len, seed.toInt) + } + + override def hash(value: Any, dataType: DataType, seed: Long): Long = { + value match { + case s: UTF8String => + val bytes = s.getBytes + var result: Int = 0 + var i = 0 + while (i < bytes.length) { + result = (result * 31) + bytes(i).toInt + i += 1 + } + result + + + case array: ArrayData => + val elementType = dataType match { + case udt: UserDefinedType[_] => udt.sqlType.asInstanceOf[ArrayType].elementType + case ArrayType(et, _) => et + } + var result: Int = 0 + var i = 0 + while (i < array.numElements()) { + result = (31 * result) + hash(array.get(i, elementType), elementType, 0).toInt + i += 1 + } + result + + case map: MapData => + val (kt, vt) = dataType match { + case udt: UserDefinedType[_] => + val mapType = udt.sqlType.asInstanceOf[MapType] + mapType.keyType -> mapType.valueType + case MapType(_kt, _vt, _) => _kt -> _vt + } + val keys = map.keyArray() + val values = map.valueArray() + var result: Int = 0 + var i = 0 + while (i < map.numElements()) { + result += hash(keys.get(i, kt), kt, 0).toInt ^ hash(values.get(i, vt), vt, 0).toInt + i += 1 + } + result + + case struct: InternalRow => + val types: Array[DataType] = dataType match { + case udt: UserDefinedType[_] => + udt.sqlType.asInstanceOf[StructType].map(_.dataType).toArray + case StructType(fields) => fields.map(_.dataType) + } + + var i = 0 + var result = 0 + val len = struct.numFields + while (i < len) { + result = (31 * result) + hash(struct.get(i, types(i)), types(i), seed).toInt + i += 1 + } + result + + case _ => super.hash(value, dataType, seed) + } + } +} + +object HiveHasher { + def hashInt(input: Int, seed: Int): Int = input + + def hashLong(input: Long, seed: Int): Int = ((input >>> 32) ^ input).toInt + + def hashUnsafeBytes(base: AnyRef, offset: Long, lengthInBytes: Int, seed: Int): Int = { + assert(lengthInBytes >= 0, "lengthInBytes cannot be negative") + + var result: Int = 0 + var i: Int = lengthInBytes - lengthInBytes % 4 + while (i < lengthInBytes) { + result = (result * 31) + Platform.getByte(base, offset + i).toInt + i += 1 + } + result + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveHashSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveHashSuite.scala new file mode 100644 index 0000000000000..3a67901e54837 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveHashSuite.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.util + +import org.apache.hadoop.hive.serde2.io.ShortWritable +import org.apache.hadoop.hive.serde2.objectinspector._ +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory._ +import org.apache.hadoop.io._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +class HiveHashSuite extends SparkFunSuite { + private val Seed = 0 + + def verifyHashes(pair: (DataType, Any, Writable, ObjectInspector)): Unit = { + val (dataType, value, writableValue, inspector) = pair + val expectedHash = ObjectInspectorUtils.hashCode(writableValue, inspector) + val actualHash = HiveHashFunction.hash(value, dataType, Seed) + assert(expectedHash == actualHash) + } + + test("primitive datatypes") { + val (boolTrue, boolFalse) = (true, false) + val (byte, short, integer, long) = (198.toByte, 5544.toShort, 123457878, 89945787834323L) + val (float, double) = (12.65F, 1945.9009) + + Seq( + (BooleanType, boolTrue, new BooleanWritable(boolTrue), writableBooleanObjectInspector), + (BooleanType, boolFalse, new BooleanWritable(boolFalse), writableBooleanObjectInspector), + (ByteType, byte, new ByteWritable(byte), writableByteObjectInspector), + (ShortType, short, new ShortWritable(short), writableShortObjectInspector), + (IntegerType, integer, new IntWritable(integer), writableIntObjectInspector), + (LongType, long, new LongWritable(long), writableLongObjectInspector), + (FloatType, float, new FloatWritable(float), writableFloatObjectInspector), + (DoubleType, double, new DoubleWritable(double), writableDoubleObjectInspector) + ).foreach(verifyHashes) + } + + test("UTF8String data") { + val input = "my_test" + val text = new Text(input) + val inspector = writableStringObjectInspector + val expectedHash = ObjectInspectorUtils.hashCode(text, inspector) + + val utf8String = UTF8String.fromString(input) + val actualHash = HiveHashFunction.hash(utf8String, StringType, Seed) + assert(expectedHash == actualHash) + } + + test("array datatype") { + val array = Array(1, 2, 3) + val list = new util.ArrayList[IntWritable]() + array.foreach(item => list.add(new IntWritable(item))) + + val listInspector = + ObjectInspectorFactory.getStandardListObjectInspector(writableIntObjectInspector) + val expectedHash = ObjectInspectorUtils.hashCode(list, listInspector) + + val input = new GenericArrayData(array) + val actualHash = HiveHashFunction.hash(input, ArrayType(IntegerType), Seed) + assert(expectedHash == actualHash) + } + + test("map datatype") { + val keys = Array(1, 2, 3) + val values = Array(10, 20, 30) + val map = new util.LinkedHashMap[IntWritable, IntWritable]() + keys.zip(values).map { + case (k: Int, v: Int) => + map.put(new IntWritable(k), new IntWritable(v)) + } + + val listInspector = + ObjectInspectorFactory.getStandardMapObjectInspector( + writableIntObjectInspector, + writableIntObjectInspector) + + val expectedHash = ObjectInspectorUtils.hashCode(map, listInspector) + + val input = new ArrayBasedMapData(new GenericArrayData(keys), new GenericArrayData(values)) + val actualHash = HiveHashFunction.hash(input, MapType(IntegerType, IntegerType), Seed) + assert(expectedHash == actualHash) + } +} From 3bed3a1fc7dcfa886f1baa6450afa2b84c66e3f8 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Wed, 14 Sep 2016 15:13:54 -0700 Subject: [PATCH 2/4] do codeGen() --- .../apache/spark/unsafe/hash/HiveHasher.java | 66 +++ .../spark/unsafe/hash/HiveHasherSuite.java | 128 ++++++ .../spark/sql/catalyst/expressions/misc.scala | 378 +++++++++++++++--- .../expressions/MiscFunctionsSuite.scala | 3 +- .../org/apache/spark/sql/hive/HiveHash.scala | 145 ------- .../apache/spark/sql/hive/HiveHashSuite.scala | 104 ----- 6 files changed, 517 insertions(+), 307 deletions(-) create mode 100644 common/unsafe/src/main/java/org/apache/spark/unsafe/hash/HiveHasher.java create mode 100644 common/unsafe/src/test/java/org/apache/spark/unsafe/hash/HiveHasherSuite.java delete mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveHash.scala delete mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveHashSuite.scala diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/HiveHasher.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/HiveHasher.java new file mode 100644 index 0000000000000..af8c8f28960fe --- /dev/null +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/HiveHasher.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.hash; + +import org.apache.spark.unsafe.Platform; + +/** + * Simulates Hive's hashing function at + * org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#hashcode() + */ +public class HiveHasher { + private final long seed; + + public HiveHasher() { + this.seed = 0; + } + + @Override + public String toString() { + return "HiveHasher"; + } + + public int hashInt(int input) { + return hashInt(input, seed); + } + + public int hashLong(long input) { + return hashLong(input, seed); + } + + public int hashUnsafeBytes(Object base, long offset, int lengthInBytes) { + return hashUnsafeBytes(base, offset, lengthInBytes, seed); + } + + public static int hashInt(int input, long seed) { + return input; + } + + public static int hashLong(long input, long seed) { + return (int) ((input >>> 32) ^ input); + } + + public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, long seed) { + assert (lengthInBytes >= 0): "lengthInBytes cannot be negative"; + int result = 0; + for (int i = 0; i < lengthInBytes; i++) { + result = (result * 31) + (int) Platform.getByte(base, offset + i); + } + return result; + } +} diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/HiveHasherSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/HiveHasherSuite.java new file mode 100644 index 0000000000000..d4feefa8b2294 --- /dev/null +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/HiveHasherSuite.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.hash; + +import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.types.UTF8String; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +public class HiveHasherSuite { + private final static HiveHasher hasher = new HiveHasher(); + + @Test + public void testKnownIntegerInputs() { + int[] inputs = {0, Integer.MIN_VALUE, Integer.MAX_VALUE, 593689054, -189366624}; + for (int input : inputs) { + Assert.assertEquals(input, hasher.hashInt(input)); + } + } + + @Test + public void testKnownLongInputs() { + Assert.assertEquals(0, hasher.hashLong(0L)); + Assert.assertEquals(41, hasher.hashLong(-42L)); + Assert.assertEquals(42, hasher.hashLong(42L)); + Assert.assertEquals(-2147483648, hasher.hashLong(Long.MIN_VALUE)); + Assert.assertEquals(-2147483648, hasher.hashLong(Long.MAX_VALUE)); + } + + @Test + public void testKnownStringAndIntInputs() { + int[] inputs = {84, 19, 8}; + int[] expected = {-823832826, -823835053, 111972242}; + + for (int i = 0; i < inputs.length; i++) { + UTF8String s = UTF8String.fromString("val_" + inputs[i]); + int hash = hasher.hashUnsafeBytes(s.getBaseObject(), s.getBaseOffset(), s.numBytes()); + Assert.assertEquals(expected[i], ((31 * inputs[i]) + hash)); + } + } + + @Test + public void randomizedStressTest() { + int size = 65536; + Random rand = new Random(); + + // A set used to track collision rate. + Set hashcodes = new HashSet<>(); + for (int i = 0; i < size; i++) { + int vint = rand.nextInt(); + long lint = rand.nextLong(); + Assert.assertEquals(hasher.hashInt(vint), hasher.hashInt(vint)); + Assert.assertEquals(hasher.hashLong(lint), hasher.hashLong(lint)); + + hashcodes.add(hasher.hashLong(lint)); + } + + // A very loose bound. + Assert.assertTrue(hashcodes.size() > size * 0.95); + } + + @Test + public void randomizedStressTestBytes() { + int size = 65536; + Random rand = new Random(); + + // A set used to track collision rate. + Set hashcodes = new HashSet<>(); + for (int i = 0; i < size; i++) { + int byteArrSize = rand.nextInt(100) * 8; + byte[] bytes = new byte[byteArrSize]; + rand.nextBytes(bytes); + + Assert.assertEquals( + hasher.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize), + hasher.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize)); + + hashcodes.add(hasher.hashUnsafeBytes( + bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize)); + } + + // A very loose bound. + Assert.assertTrue(hashcodes.size() > size * 0.95); + } + + @Test + public void randomizedStressTestPaddedStrings() { + int size = 64000; + // A set used to track collision rate. + Set hashcodes = new HashSet<>(); + for (int i = 0; i < size; i++) { + int byteArrSize = 8; + byte[] strBytes = String.valueOf(i).getBytes(StandardCharsets.UTF_8); + byte[] paddedBytes = new byte[byteArrSize]; + System.arraycopy(strBytes, 0, paddedBytes, 0, strBytes.length); + + Assert.assertEquals( + hasher.hashUnsafeBytes(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize), + hasher.hashUnsafeBytes(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize)); + + hashcodes.add(hasher.hashUnsafeBytes( + paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize)); + } + + // A very loose bound. + Assert.assertTrue(hashcodes.size() > size * 0.95); + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala index 92f8fb85fc0e2..568705659b1b4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.hash.Murmur3_x86_32 +import org.apache.spark.unsafe.hash.{HiveHasher, Murmur3_x86_32} import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} import org.apache.spark.unsafe.Platform @@ -259,7 +259,7 @@ abstract class HashExpression[E] extends Expression { $childrenHash""") } - private def nullSafeElementHash( + protected def nullSafeElementHash( input: String, index: String, nullable: Boolean, @@ -276,6 +276,97 @@ abstract class HashExpression[E] extends Expression { } } + protected def genHashInt(i: String, hasher: String, result: String): String = + s"$result = $hasher.hashInt($i, $result);" + + protected def genHashLong(l: String, hasher: String, result: String): String = + s"$result = $hasher.hashLong($l, $result);" + + protected def genHashBytes(b: String, hasher: String, result: String): String = + s"$result = $hasher.hashUnsafeBytes($b, Platform.BYTE_ARRAY_OFFSET, $b.length, $result);" + + protected def genHashBoolean(input: String, hasher: String, result: String): String = + genHashInt(s"$input ? 1 : 0", hasher, result) + + protected def genHashFloat(input: String, hasher: String, result: String): String = + genHashInt(s"Float.floatToIntBits($input)", hasher, result) + + protected def genHashDouble(input: String, hasher: String, result: String): String = + genHashLong(s"Double.doubleToLongBits($input)", hasher, result) + + protected def genHashDecimal( + ctx: CodegenContext, + d: DecimalType, + input: String, + hasher: String, + result: String): String = { + if (d.precision <= Decimal.MAX_LONG_DIGITS) { + genHashLong(s"$input.toUnscaledLong()", hasher, result) + } else { + val bytes = ctx.freshName("bytes") + s""" + final byte[] $bytes = $input.toJavaBigDecimal().unscaledValue().toByteArray(); + ${genHashBytes(bytes, hasher, result)} + """ + } + } + + protected def genHashCalendarInterval(input: String, hasher: String, result: String): String = { + val microsecondsHash = s"$hasher.hashLong($input.microseconds, $result)" + s"$result = $hasher.hashInt($input.months, $microsecondsHash);" + } + + protected def genHashString(input: String, hasher: String, result: String): String = { + val baseObject = s"$input.getBaseObject()" + val baseOffset = s"$input.getBaseOffset()" + val numBytes = s"$input.numBytes()" + s"$result = $hasher.hashUnsafeBytes($baseObject, $baseOffset, $numBytes, $result);" + } + + protected def genHashForMap( + ctx: CodegenContext, + input: String, + result: String, + keyType: DataType, + valueType: DataType, + valueContainsNull: Boolean): String = { + val index = ctx.freshName("index") + val keys = ctx.freshName("keys") + val values = ctx.freshName("values") + s""" + final ArrayData $keys = $input.keyArray(); + final ArrayData $values = $input.valueArray(); + for (int $index = 0; $index < $input.numElements(); $index++) { + ${nullSafeElementHash(keys, index, false, keyType, result, ctx)} + ${nullSafeElementHash(values, index, valueContainsNull, valueType, result, ctx)} + } + """ + } + + protected def genHashForArray( + ctx: CodegenContext, + input: String, + result: String, + elementType: DataType, + containsNull: Boolean): String = { + val index = ctx.freshName("index") + s""" + for (int $index = 0; $index < $input.numElements(); $index++) { + ${nullSafeElementHash(input, index, containsNull, elementType, result, ctx)} + } + """ + } + + protected def genHashForStruct( + ctx: CodegenContext, + input: String, + result: String, + fields: Array[StructField]): String = { + fields.zipWithIndex.map { case (field, index) => + nullSafeElementHash(input, index.toString, field.nullable, field.dataType, result, ctx) + }.mkString("\n") + } + @tailrec private def computeHash( input: String, @@ -284,64 +375,21 @@ abstract class HashExpression[E] extends Expression { ctx: CodegenContext): String = { val hasher = hasherClassName - def hashInt(i: String): String = s"$result = $hasher.hashInt($i, $result);" - def hashLong(l: String): String = s"$result = $hasher.hashLong($l, $result);" - def hashBytes(b: String): String = - s"$result = $hasher.hashUnsafeBytes($b, Platform.BYTE_ARRAY_OFFSET, $b.length, $result);" - dataType match { case NullType => "" - case BooleanType => hashInt(s"$input ? 1 : 0") - case ByteType | ShortType | IntegerType | DateType => hashInt(input) - case LongType | TimestampType => hashLong(input) - case FloatType => hashInt(s"Float.floatToIntBits($input)") - case DoubleType => hashLong(s"Double.doubleToLongBits($input)") - case d: DecimalType => - if (d.precision <= Decimal.MAX_LONG_DIGITS) { - hashLong(s"$input.toUnscaledLong()") - } else { - val bytes = ctx.freshName("bytes") - s""" - final byte[] $bytes = $input.toJavaBigDecimal().unscaledValue().toByteArray(); - ${hashBytes(bytes)} - """ - } - case CalendarIntervalType => - val microsecondsHash = s"$hasher.hashLong($input.microseconds, $result)" - s"$result = $hasher.hashInt($input.months, $microsecondsHash);" - case BinaryType => hashBytes(input) - case StringType => - val baseObject = s"$input.getBaseObject()" - val baseOffset = s"$input.getBaseOffset()" - val numBytes = s"$input.numBytes()" - s"$result = $hasher.hashUnsafeBytes($baseObject, $baseOffset, $numBytes, $result);" - - case ArrayType(et, containsNull) => - val index = ctx.freshName("index") - s""" - for (int $index = 0; $index < $input.numElements(); $index++) { - ${nullSafeElementHash(input, index, containsNull, et, result, ctx)} - } - """ - + case BooleanType => genHashBoolean(input, hasher, result) + case ByteType | ShortType | IntegerType | DateType => genHashInt(input, hasher, result) + case LongType | TimestampType => genHashLong(input, hasher, result) + case FloatType => genHashFloat(input, hasher, result) + case DoubleType => genHashDouble(input, hasher, result) + case d: DecimalType => genHashDecimal(ctx, d, input, hasher, result) + case CalendarIntervalType => genHashCalendarInterval(input, hasher, result) + case BinaryType => genHashBytes(input, hasher, result) + case StringType => genHashString(input, hasher, result) + case ArrayType(et, containsNull) => genHashForArray(ctx, input, result, et, containsNull) case MapType(kt, vt, valueContainsNull) => - val index = ctx.freshName("index") - val keys = ctx.freshName("keys") - val values = ctx.freshName("values") - s""" - final ArrayData $keys = $input.keyArray(); - final ArrayData $values = $input.valueArray(); - for (int $index = 0; $index < $input.numElements(); $index++) { - ${nullSafeElementHash(keys, index, false, kt, result, ctx)} - ${nullSafeElementHash(values, index, valueContainsNull, vt, result, ctx)} - } - """ - - case StructType(fields) => - fields.zipWithIndex.map { case (field, index) => - nullSafeElementHash(input, index.toString, field.nullable, field.dataType, result, ctx) - }.mkString("\n") - + genHashForMap(ctx, input, result, kt, vt, valueContainsNull) + case StructType(fields) => genHashForStruct(ctx, input, result, fields) case udt: UserDefinedType[_] => computeHash(input, udt.sqlType, result, ctx) } } @@ -565,3 +613,219 @@ case class CurrentDatabase() extends LeafExpression with Unevaluable { override def foldable: Boolean = true override def nullable: Boolean = false } + +/** + * Simulates Hive's hashing function at + * org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#hashcode() in Hive + * + * We should use this hash function for both shuffle and bucket of Hive tables, so that + * we can guarantee shuffle and bucketing have same data distribution + * + * TODO: Support Decimal and date related types + */ +@ExpressionDescription( + usage = "_FUNC_(a1, a2, ...) - Returns a hash value of the arguments.") +case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] { + override val seed = 0 + + override def dataType: DataType = IntegerType + + override def prettyName: String = "hive-hash" + + override protected def hasherClassName: String = classOf[HiveHasher].getName + + override protected def computeHash(value: Any, dataType: DataType, seed: Int): Int = { + HiveHashFunction.hash(value, dataType, seed).toInt + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + ev.isNull = "false" + val childHash = ctx.freshName("childHash") + val childrenHash = children.map { child => + val childGen = child.genCode(ctx) + childGen.code + ctx.nullSafeExec(child.nullable, childGen.isNull) { + computeHash(childGen.value, child.dataType, childHash, ctx) + } + s"${ev.value} = (31 * ${ev.value}) + $childHash;" + }.mkString(s"int $childHash = 0;", s"\n$childHash = 0;\n", "") + + ev.copy(code = s""" + ${ctx.javaType(dataType)} ${ev.value} = $seed; + $childrenHash""") + } + + @tailrec + private def computeHash( + input: String, + dataType: DataType, + result: String, + ctx: CodegenContext): String = { + val hasher = hasherClassName + + dataType match { + case NullType => "" + case BooleanType => genHashBoolean(input, hasher, result) + case ByteType | ShortType | IntegerType | DateType => genHashInt(input, hasher, result) + case LongType | TimestampType => genHashLong(input, hasher, result) + case FloatType => genHashFloat(input, hasher, result) + case DoubleType => genHashDouble(input, hasher, result) + case d: DecimalType => genHashDecimal(ctx, d, input, hasher, result) + case CalendarIntervalType => genHashCalendarInterval(input, hasher, result) + case BinaryType => genHashBytes(input, hasher, result) + case StringType => genHashString(input, hasher, result) + case ArrayType(et, containsNull) => genHashForArray(ctx, input, result, et, containsNull) + case MapType(kt, vt, valueContainsNull) => + genHashForMap(ctx, input, result, kt, vt, valueContainsNull) + case StructType(fields) => genHashForStruct(ctx, input, result, fields) + case udt: UserDefinedType[_] => computeHash(input, udt.sqlType, result, ctx) + } + } + + override def eval(input: InternalRow): Int = { + var hash = seed + var i = 0 + val len = children.length + while (i < len) { + hash = (31 * hash) + computeHash(children(i).eval(input), children(i).dataType, hash) + i += 1 + } + hash + } + + override protected def genHashInt(i: String, hasher: String, result: String): String = + s"$result = $hasher.hashInt($i, 0);" + + override protected def genHashLong(l: String, hasher: String, result: String): String = + s"$result = $hasher.hashLong($l, 0);" + + override protected def genHashBytes(b: String, hasher: String, result: String): String = + s"$result = $hasher.hashUnsafeBytes($b, Platform.BYTE_ARRAY_OFFSET, $b.length, 0);" + + override protected def genHashForArray( + ctx: CodegenContext, + input: String, + result: String, + elementType: DataType, + containsNull: Boolean): String = { + val index = ctx.freshName("index") + val childResult = ctx.freshName("childResult") + s""" + int $childResult = 0; + for (int $index = 0; $index < $input.numElements(); $index++) { + $childResult = 0; + ${nullSafeElementHash(input, index, containsNull, elementType, childResult, ctx)}; + $result = (31 * $result) + $childResult; + } + """ + } + + override protected def genHashForMap( + ctx: CodegenContext, + input: String, + result: String, + keyType: DataType, + valueType: DataType, + valueContainsNull: Boolean): String = { + val index = ctx.freshName("index") + val keys = ctx.freshName("keys") + val values = ctx.freshName("values") + val keyResult = ctx.freshName("keyResult") + val valueResult = ctx.freshName("valueResult") + s""" + final ArrayData $keys = $input.keyArray(); + final ArrayData $values = $input.valueArray(); + int $keyResult = 0; + int $valueResult = 0; + for (int $index = 0; $index < $input.numElements(); $index++) { + $keyResult = 0; + ${nullSafeElementHash(keys, index, false, keyType, keyResult, ctx)} + $valueResult = 0; + ${nullSafeElementHash(values, index, valueContainsNull, valueType, valueResult, ctx)} + $result += $keyResult ^ $valueResult; + } + """ + } + + override protected def genHashForStruct( + ctx: CodegenContext, + input: String, + result: String, + fields: Array[StructField]): String = { + val localResult = ctx.freshName("localResult") + val childResult = ctx.freshName("childResult") + fields.zipWithIndex.map { case (field, index) => + s""" + $childResult = 0; + ${nullSafeElementHash(input, index.toString, field.nullable, field.dataType, + childResult, ctx)} + $localResult = (31 * $localResult) + $childResult; + """ + }.mkString( + s""" + int $localResult = 0; + int $childResult = 0; + """, + "", + s"$result = (31 * $result) + $localResult;" + ) + } +} + +object HiveHashFunction extends InterpretedHashFunction { + override protected def hashInt(i: Int, seed: Long): Long = { + HiveHasher.hashInt(i, seed) + } + + override protected def hashLong(l: Long, seed: Long): Long = { + HiveHasher.hashLong(l, seed) + } + + override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = { + HiveHasher.hashUnsafeBytes(base, offset, len, seed) + } + + override def hash(value: Any, dataType: DataType, seed: Long): Long = { + value match { + case null => 0 + case array: ArrayData => + val elementType = dataType match { + case udt: UserDefinedType[_] => udt.sqlType.asInstanceOf[ArrayType].elementType + case ArrayType(et, _) => et + } + var result: Int = 0 + for (i <- 0 until array.numElements()) { + result = (31 * result) + hash(array.get(i, elementType), elementType, 0).toInt + } + result + + case map: MapData => + val (kt, vt) = dataType match { + case udt: UserDefinedType[_] => + val mapType = udt.sqlType.asInstanceOf[MapType] + mapType.keyType -> mapType.valueType + case MapType(_kt, _vt, _) => _kt -> _vt + } + val keys = map.keyArray() + val values = map.valueArray() + var result: Int = 0 + for (i <- 0 until map.numElements()) { + result += hash(keys.get(i, kt), kt, 0).toInt ^ hash(values.get(i, vt), vt, 0).toInt + } + result + + case struct: InternalRow => + val types: Array[DataType] = dataType match { + case udt: UserDefinedType[_] => + udt.sqlType.asInstanceOf[StructType].map(_.dataType).toArray + case StructType(fields) => fields.map(_.dataType) + } + + var result = 0 + for (i <- 0 until struct.numFields) { + result = (31 * result) + hash(struct.get(i, types(i)), types(i), seed + 1).toInt + } + result + + case _ => super.hash(value, dataType, seed) + } + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala index 33916c0891866..13ce588462028 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala @@ -145,7 +145,7 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { val inputGenerator = RandomDataGenerator.forType(inputSchema, nullable = false).get val encoder = RowEncoder(inputSchema) val seed = scala.util.Random.nextInt() - test(s"murmur3/xxHash64 hash: ${inputSchema.simpleString}") { + test(s"murmur3/xxHash64/hive hash: ${inputSchema.simpleString}") { for (_ <- 1 to 10) { val input = encoder.toRow(inputGenerator.apply().asInstanceOf[Row]).asInstanceOf[UnsafeRow] val literals = input.toSeq(inputSchema).zip(inputSchema.map(_.dataType)).map { @@ -154,6 +154,7 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { // Only test the interpreted version has same result with codegen version. checkEvaluation(Murmur3Hash(literals, seed), Murmur3Hash(literals, seed).eval()) checkEvaluation(XxHash64(literals, seed), XxHash64(literals, seed).eval()) + checkEvaluation(HiveHash(literals), HiveHash(literals).eval()) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveHash.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveHash.scala deleted file mode 100644 index 3230c868e7bff..0000000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveHash.scala +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.Platform -import org.apache.spark.unsafe.types.UTF8String - -/** - * Simulates Hive's hashing function at - * org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#hashcode() in Hive - * - * We should use this hash function for both shuffle and bucket of Hive tables, so that - * we can guarantee shuffle and bucketing have same data distribution - * - * TODO: Support Decimal and date related types - */ -@ExpressionDescription( - usage = "_FUNC_(a1, a2, ...) - Returns a hash value of the arguments.") -case class HiveHash(children: Seq[Expression], seed: Int) extends HashExpression[Int] { - def this(arguments: Seq[Expression]) = this(arguments, 42) - - override def dataType: DataType = IntegerType - - override def prettyName: String = "hive-hash" - - override protected def hasherClassName: String = classOf[HiveHash].getName - - override protected def computeHash(value: Any, dataType: DataType, seed: Int): Int = { - HiveHashFunction.hash(value, dataType, seed).toInt - } -} - -object HiveHashFunction extends InterpretedHashFunction { - override protected def hashInt(i: Int, seed: Long): Long = { - HiveHasher.hashInt(i, seed.toInt) - } - - override protected def hashLong(l: Long, seed: Long): Long = { - HiveHasher.hashLong(l, seed.toInt) - } - - override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = { - HiveHasher.hashUnsafeBytes(base, offset, len, seed.toInt) - } - - override def hash(value: Any, dataType: DataType, seed: Long): Long = { - value match { - case s: UTF8String => - val bytes = s.getBytes - var result: Int = 0 - var i = 0 - while (i < bytes.length) { - result = (result * 31) + bytes(i).toInt - i += 1 - } - result - - - case array: ArrayData => - val elementType = dataType match { - case udt: UserDefinedType[_] => udt.sqlType.asInstanceOf[ArrayType].elementType - case ArrayType(et, _) => et - } - var result: Int = 0 - var i = 0 - while (i < array.numElements()) { - result = (31 * result) + hash(array.get(i, elementType), elementType, 0).toInt - i += 1 - } - result - - case map: MapData => - val (kt, vt) = dataType match { - case udt: UserDefinedType[_] => - val mapType = udt.sqlType.asInstanceOf[MapType] - mapType.keyType -> mapType.valueType - case MapType(_kt, _vt, _) => _kt -> _vt - } - val keys = map.keyArray() - val values = map.valueArray() - var result: Int = 0 - var i = 0 - while (i < map.numElements()) { - result += hash(keys.get(i, kt), kt, 0).toInt ^ hash(values.get(i, vt), vt, 0).toInt - i += 1 - } - result - - case struct: InternalRow => - val types: Array[DataType] = dataType match { - case udt: UserDefinedType[_] => - udt.sqlType.asInstanceOf[StructType].map(_.dataType).toArray - case StructType(fields) => fields.map(_.dataType) - } - - var i = 0 - var result = 0 - val len = struct.numFields - while (i < len) { - result = (31 * result) + hash(struct.get(i, types(i)), types(i), seed).toInt - i += 1 - } - result - - case _ => super.hash(value, dataType, seed) - } - } -} - -object HiveHasher { - def hashInt(input: Int, seed: Int): Int = input - - def hashLong(input: Long, seed: Int): Int = ((input >>> 32) ^ input).toInt - - def hashUnsafeBytes(base: AnyRef, offset: Long, lengthInBytes: Int, seed: Int): Int = { - assert(lengthInBytes >= 0, "lengthInBytes cannot be negative") - - var result: Int = 0 - var i: Int = lengthInBytes - lengthInBytes % 4 - while (i < lengthInBytes) { - result = (result * 31) + Platform.getByte(base, offset + i).toInt - i += 1 - } - result - } -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveHashSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveHashSuite.scala deleted file mode 100644 index 3a67901e54837..0000000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveHashSuite.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import java.util - -import org.apache.hadoop.hive.serde2.io.ShortWritable -import org.apache.hadoop.hive.serde2.objectinspector._ -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory._ -import org.apache.hadoop.io._ - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String - -class HiveHashSuite extends SparkFunSuite { - private val Seed = 0 - - def verifyHashes(pair: (DataType, Any, Writable, ObjectInspector)): Unit = { - val (dataType, value, writableValue, inspector) = pair - val expectedHash = ObjectInspectorUtils.hashCode(writableValue, inspector) - val actualHash = HiveHashFunction.hash(value, dataType, Seed) - assert(expectedHash == actualHash) - } - - test("primitive datatypes") { - val (boolTrue, boolFalse) = (true, false) - val (byte, short, integer, long) = (198.toByte, 5544.toShort, 123457878, 89945787834323L) - val (float, double) = (12.65F, 1945.9009) - - Seq( - (BooleanType, boolTrue, new BooleanWritable(boolTrue), writableBooleanObjectInspector), - (BooleanType, boolFalse, new BooleanWritable(boolFalse), writableBooleanObjectInspector), - (ByteType, byte, new ByteWritable(byte), writableByteObjectInspector), - (ShortType, short, new ShortWritable(short), writableShortObjectInspector), - (IntegerType, integer, new IntWritable(integer), writableIntObjectInspector), - (LongType, long, new LongWritable(long), writableLongObjectInspector), - (FloatType, float, new FloatWritable(float), writableFloatObjectInspector), - (DoubleType, double, new DoubleWritable(double), writableDoubleObjectInspector) - ).foreach(verifyHashes) - } - - test("UTF8String data") { - val input = "my_test" - val text = new Text(input) - val inspector = writableStringObjectInspector - val expectedHash = ObjectInspectorUtils.hashCode(text, inspector) - - val utf8String = UTF8String.fromString(input) - val actualHash = HiveHashFunction.hash(utf8String, StringType, Seed) - assert(expectedHash == actualHash) - } - - test("array datatype") { - val array = Array(1, 2, 3) - val list = new util.ArrayList[IntWritable]() - array.foreach(item => list.add(new IntWritable(item))) - - val listInspector = - ObjectInspectorFactory.getStandardListObjectInspector(writableIntObjectInspector) - val expectedHash = ObjectInspectorUtils.hashCode(list, listInspector) - - val input = new GenericArrayData(array) - val actualHash = HiveHashFunction.hash(input, ArrayType(IntegerType), Seed) - assert(expectedHash == actualHash) - } - - test("map datatype") { - val keys = Array(1, 2, 3) - val values = Array(10, 20, 30) - val map = new util.LinkedHashMap[IntWritable, IntWritable]() - keys.zip(values).map { - case (k: Int, v: Int) => - map.put(new IntWritable(k), new IntWritable(v)) - } - - val listInspector = - ObjectInspectorFactory.getStandardMapObjectInspector( - writableIntObjectInspector, - writableIntObjectInspector) - - val expectedHash = ObjectInspectorUtils.hashCode(map, listInspector) - - val input = new ArrayBasedMapData(new GenericArrayData(keys), new GenericArrayData(values)) - val actualHash = HiveHashFunction.hash(input, MapType(IntegerType, IntegerType), Seed) - assert(expectedHash == actualHash) - } -} From cf628911137a3880fe71c9217a43df716d717fcc Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Tue, 27 Sep 2016 23:35:40 -0700 Subject: [PATCH 3/4] addressing hvanhovell's review comments --- .../catalyst/expressions}/HiveHasher.java | 27 +-- .../spark/sql/catalyst/expressions/misc.scala | 169 +++++++++--------- .../expressions}/HiveHasherSuite.java | 34 ++-- .../org/apache/spark/sql/HashBenchmark.scala | 83 +++++---- .../spark/sql/HashByteArrayBenchmark.scala | 114 +++++++----- 5 files changed, 224 insertions(+), 203 deletions(-) rename common/unsafe/src/main/java/org/apache/spark/{unsafe/hash => sql/catalyst/expressions}/HiveHasher.java (71%) rename {common/unsafe/src/test/java/org/apache/spark/unsafe/hash => sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions}/HiveHasherSuite.java (73%) diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/HiveHasher.java b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java similarity index 71% rename from common/unsafe/src/main/java/org/apache/spark/unsafe/hash/HiveHasher.java rename to common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java index af8c8f28960fe..c7ea9085eba66 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/HiveHasher.java +++ b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.unsafe.hash; +package org.apache.spark.sql.catalyst.expressions; import org.apache.spark.unsafe.Platform; @@ -24,38 +24,21 @@ * org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#hashcode() */ public class HiveHasher { - private final long seed; - - public HiveHasher() { - this.seed = 0; - } @Override public String toString() { - return "HiveHasher"; - } - - public int hashInt(int input) { - return hashInt(input, seed); - } - - public int hashLong(long input) { - return hashLong(input, seed); - } - - public int hashUnsafeBytes(Object base, long offset, int lengthInBytes) { - return hashUnsafeBytes(base, offset, lengthInBytes, seed); + return HiveHasher.class.getSimpleName(); } - public static int hashInt(int input, long seed) { + public static int hashInt(int input) { return input; } - public static int hashLong(long input, long seed) { + public static int hashLong(long input) { return (int) ((input >>> 32) ^ input); } - public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, long seed) { + public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) { assert (lengthInBytes >= 0): "lengthInBytes cannot be negative"; int result = 0; for (int i = 0; i < lengthInBytes; i++) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala index 568705659b1b4..10baf9c8e0c5e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.hash.{HiveHasher, Murmur3_x86_32} +import org.apache.spark.unsafe.hash.Murmur3_x86_32 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} import org.apache.spark.unsafe.Platform @@ -276,51 +276,52 @@ abstract class HashExpression[E] extends Expression { } } - protected def genHashInt(i: String, hasher: String, result: String): String = - s"$result = $hasher.hashInt($i, $result);" + protected def genHashInt(i: String, result: String): String = + s"$result = $hasherClassName.hashInt($i, $result);" - protected def genHashLong(l: String, hasher: String, result: String): String = - s"$result = $hasher.hashLong($l, $result);" + protected def genHashLong(l: String, result: String): String = + s"$result = $hasherClassName.hashLong($l, $result);" - protected def genHashBytes(b: String, hasher: String, result: String): String = - s"$result = $hasher.hashUnsafeBytes($b, Platform.BYTE_ARRAY_OFFSET, $b.length, $result);" + protected def genHashBytes(b: String, result: String): String = { + val offset = "Platform.BYTE_ARRAY_OFFSET" + s"$result = $hasherClassName.hashUnsafeBytes($b, $offset, $b.length, $result);" + } - protected def genHashBoolean(input: String, hasher: String, result: String): String = - genHashInt(s"$input ? 1 : 0", hasher, result) + protected def genHashBoolean(input: String, result: String): String = + genHashInt(s"$input ? 1 : 0", result) - protected def genHashFloat(input: String, hasher: String, result: String): String = - genHashInt(s"Float.floatToIntBits($input)", hasher, result) + protected def genHashFloat(input: String, result: String): String = + genHashInt(s"Float.floatToIntBits($input)", result) - protected def genHashDouble(input: String, hasher: String, result: String): String = - genHashLong(s"Double.doubleToLongBits($input)", hasher, result) + protected def genHashDouble(input: String, result: String): String = + genHashLong(s"Double.doubleToLongBits($input)", result) protected def genHashDecimal( ctx: CodegenContext, d: DecimalType, input: String, - hasher: String, result: String): String = { if (d.precision <= Decimal.MAX_LONG_DIGITS) { - genHashLong(s"$input.toUnscaledLong()", hasher, result) + genHashLong(s"$input.toUnscaledLong()", result) } else { val bytes = ctx.freshName("bytes") s""" final byte[] $bytes = $input.toJavaBigDecimal().unscaledValue().toByteArray(); - ${genHashBytes(bytes, hasher, result)} + ${genHashBytes(bytes, result)} """ } } - protected def genHashCalendarInterval(input: String, hasher: String, result: String): String = { - val microsecondsHash = s"$hasher.hashLong($input.microseconds, $result)" - s"$result = $hasher.hashInt($input.months, $microsecondsHash);" + protected def genHashCalendarInterval(input: String, result: String): String = { + val microsecondsHash = s"$hasherClassName.hashLong($input.microseconds, $result)" + s"$result = $hasherClassName.hashInt($input.months, $microsecondsHash);" } - protected def genHashString(input: String, hasher: String, result: String): String = { + protected def genHashString(input: String, result: String): String = { val baseObject = s"$input.getBaseObject()" val baseOffset = s"$input.getBaseOffset()" val numBytes = s"$input.numBytes()" - s"$result = $hasher.hashUnsafeBytes($baseObject, $baseOffset, $numBytes, $result);" + s"$result = $hasherClassName.hashUnsafeBytes($baseObject, $baseOffset, $numBytes, $result);" } protected def genHashForMap( @@ -368,31 +369,33 @@ abstract class HashExpression[E] extends Expression { } @tailrec - private def computeHash( + private def computeHashWithTailRec( input: String, dataType: DataType, result: String, - ctx: CodegenContext): String = { - val hasher = hasherClassName - - dataType match { - case NullType => "" - case BooleanType => genHashBoolean(input, hasher, result) - case ByteType | ShortType | IntegerType | DateType => genHashInt(input, hasher, result) - case LongType | TimestampType => genHashLong(input, hasher, result) - case FloatType => genHashFloat(input, hasher, result) - case DoubleType => genHashDouble(input, hasher, result) - case d: DecimalType => genHashDecimal(ctx, d, input, hasher, result) - case CalendarIntervalType => genHashCalendarInterval(input, hasher, result) - case BinaryType => genHashBytes(input, hasher, result) - case StringType => genHashString(input, hasher, result) - case ArrayType(et, containsNull) => genHashForArray(ctx, input, result, et, containsNull) - case MapType(kt, vt, valueContainsNull) => - genHashForMap(ctx, input, result, kt, vt, valueContainsNull) - case StructType(fields) => genHashForStruct(ctx, input, result, fields) - case udt: UserDefinedType[_] => computeHash(input, udt.sqlType, result, ctx) - } - } + ctx: CodegenContext): String = dataType match { + case NullType => "" + case BooleanType => genHashBoolean(input, result) + case ByteType | ShortType | IntegerType | DateType => genHashInt(input, result) + case LongType | TimestampType => genHashLong(input, result) + case FloatType => genHashFloat(input, result) + case DoubleType => genHashDouble(input, result) + case d: DecimalType => genHashDecimal(ctx, d, input, result) + case CalendarIntervalType => genHashCalendarInterval(input, result) + case BinaryType => genHashBytes(input, result) + case StringType => genHashString(input, result) + case ArrayType(et, containsNull) => genHashForArray(ctx, input, result, et, containsNull) + case MapType(kt, vt, valueContainsNull) => + genHashForMap(ctx, input, result, kt, vt, valueContainsNull) + case StructType(fields) => genHashForStruct(ctx, input, result, fields) + case udt: UserDefinedType[_] => computeHashWithTailRec(input, udt.sqlType, result, ctx) + } + + protected def computeHash( + input: String, + dataType: DataType, + result: String, + ctx: CodegenContext): String = computeHashWithTailRec(input, dataType, result, ctx) protected def hasherClassName: String } @@ -653,33 +656,6 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] { $childrenHash""") } - @tailrec - private def computeHash( - input: String, - dataType: DataType, - result: String, - ctx: CodegenContext): String = { - val hasher = hasherClassName - - dataType match { - case NullType => "" - case BooleanType => genHashBoolean(input, hasher, result) - case ByteType | ShortType | IntegerType | DateType => genHashInt(input, hasher, result) - case LongType | TimestampType => genHashLong(input, hasher, result) - case FloatType => genHashFloat(input, hasher, result) - case DoubleType => genHashDouble(input, hasher, result) - case d: DecimalType => genHashDecimal(ctx, d, input, hasher, result) - case CalendarIntervalType => genHashCalendarInterval(input, hasher, result) - case BinaryType => genHashBytes(input, hasher, result) - case StringType => genHashString(input, hasher, result) - case ArrayType(et, containsNull) => genHashForArray(ctx, input, result, et, containsNull) - case MapType(kt, vt, valueContainsNull) => - genHashForMap(ctx, input, result, kt, vt, valueContainsNull) - case StructType(fields) => genHashForStruct(ctx, input, result, fields) - case udt: UserDefinedType[_] => computeHash(input, udt.sqlType, result, ctx) - } - } - override def eval(input: InternalRow): Int = { var hash = seed var i = 0 @@ -691,14 +667,28 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] { hash } - override protected def genHashInt(i: String, hasher: String, result: String): String = - s"$result = $hasher.hashInt($i, 0);" + override protected def genHashInt(i: String, result: String): String = + s"$result = $hasherClassName.hashInt($i);" - override protected def genHashLong(l: String, hasher: String, result: String): String = - s"$result = $hasher.hashLong($l, 0);" + override protected def genHashLong(l: String, result: String): String = + s"$result = $hasherClassName.hashLong($l);" - override protected def genHashBytes(b: String, hasher: String, result: String): String = - s"$result = $hasher.hashUnsafeBytes($b, Platform.BYTE_ARRAY_OFFSET, $b.length, 0);" + override protected def genHashBytes(b: String, result: String): String = + s"$result = $hasherClassName.hashUnsafeBytes($b, Platform.BYTE_ARRAY_OFFSET, $b.length);" + + override protected def genHashCalendarInterval(input: String, result: String): String = { + s""" + $result = (31 * $hasherClassName.hashInt($input.months)) + + $hasherClassName.hashLong($input.microseconds);" + """ + } + + override protected def genHashString(input: String, result: String): String = { + val baseObject = s"$input.getBaseObject()" + val baseOffset = s"$input.getBaseOffset()" + val numBytes = s"$input.numBytes()" + s"$result = $hasherClassName.hashUnsafeBytes($baseObject, $baseOffset, $numBytes);" + } override protected def genHashForArray( ctx: CodegenContext, @@ -772,15 +762,15 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] { object HiveHashFunction extends InterpretedHashFunction { override protected def hashInt(i: Int, seed: Long): Long = { - HiveHasher.hashInt(i, seed) + HiveHasher.hashInt(i) } override protected def hashLong(l: Long, seed: Long): Long = { - HiveHasher.hashLong(l, seed) + HiveHasher.hashLong(l) } override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = { - HiveHasher.hashUnsafeBytes(base, offset, len, seed) + HiveHasher.hashUnsafeBytes(base, offset, len) } override def hash(value: Any, dataType: DataType, seed: Long): Long = { @@ -791,9 +781,13 @@ object HiveHashFunction extends InterpretedHashFunction { case udt: UserDefinedType[_] => udt.sqlType.asInstanceOf[ArrayType].elementType case ArrayType(et, _) => et } - var result: Int = 0 - for (i <- 0 until array.numElements()) { + + var result = 0 + var i = 0 + val length = array.numElements() + while (i < length) { result = (31 * result) + hash(array.get(i, elementType), elementType, 0).toInt + i += 1 } result @@ -806,9 +800,13 @@ object HiveHashFunction extends InterpretedHashFunction { } val keys = map.keyArray() val values = map.valueArray() - var result: Int = 0 - for (i <- 0 until map.numElements()) { + + var result = 0 + var i = 0 + val length = map.numElements() + while (i < length) { result += hash(keys.get(i, kt), kt, 0).toInt ^ hash(values.get(i, vt), vt, 0).toInt + i += 1 } result @@ -820,8 +818,11 @@ object HiveHashFunction extends InterpretedHashFunction { } var result = 0 - for (i <- 0 until struct.numFields) { + var i = 0 + val length = struct.numFields + while (i < length) { result = (31 * result) + hash(struct.get(i, types(i)), types(i), seed + 1).toInt + i += 1 } result diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/HiveHasherSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java similarity index 73% rename from common/unsafe/src/test/java/org/apache/spark/unsafe/hash/HiveHasherSuite.java rename to sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java index d4feefa8b2294..67a5eb0c7fe8f 100644 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/HiveHasherSuite.java +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.unsafe.hash; +package org.apache.spark.sql.catalyst.expressions; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.types.UTF8String; @@ -34,17 +34,17 @@ public class HiveHasherSuite { public void testKnownIntegerInputs() { int[] inputs = {0, Integer.MIN_VALUE, Integer.MAX_VALUE, 593689054, -189366624}; for (int input : inputs) { - Assert.assertEquals(input, hasher.hashInt(input)); + Assert.assertEquals(input, HiveHasher.hashInt(input)); } } @Test public void testKnownLongInputs() { - Assert.assertEquals(0, hasher.hashLong(0L)); - Assert.assertEquals(41, hasher.hashLong(-42L)); - Assert.assertEquals(42, hasher.hashLong(42L)); - Assert.assertEquals(-2147483648, hasher.hashLong(Long.MIN_VALUE)); - Assert.assertEquals(-2147483648, hasher.hashLong(Long.MAX_VALUE)); + Assert.assertEquals(0, HiveHasher.hashLong(0L)); + Assert.assertEquals(41, HiveHasher.hashLong(-42L)); + Assert.assertEquals(42, HiveHasher.hashLong(42L)); + Assert.assertEquals(-2147483648, HiveHasher.hashLong(Long.MIN_VALUE)); + Assert.assertEquals(-2147483648, HiveHasher.hashLong(Long.MAX_VALUE)); } @Test @@ -54,7 +54,7 @@ public void testKnownStringAndIntInputs() { for (int i = 0; i < inputs.length; i++) { UTF8String s = UTF8String.fromString("val_" + inputs[i]); - int hash = hasher.hashUnsafeBytes(s.getBaseObject(), s.getBaseOffset(), s.numBytes()); + int hash = HiveHasher.hashUnsafeBytes(s.getBaseObject(), s.getBaseOffset(), s.numBytes()); Assert.assertEquals(expected[i], ((31 * inputs[i]) + hash)); } } @@ -69,10 +69,10 @@ public void randomizedStressTest() { for (int i = 0; i < size; i++) { int vint = rand.nextInt(); long lint = rand.nextLong(); - Assert.assertEquals(hasher.hashInt(vint), hasher.hashInt(vint)); - Assert.assertEquals(hasher.hashLong(lint), hasher.hashLong(lint)); + Assert.assertEquals(HiveHasher.hashInt(vint), HiveHasher.hashInt(vint)); + Assert.assertEquals(HiveHasher.hashLong(lint), HiveHasher.hashLong(lint)); - hashcodes.add(hasher.hashLong(lint)); + hashcodes.add(HiveHasher.hashLong(lint)); } // A very loose bound. @@ -92,10 +92,10 @@ public void randomizedStressTestBytes() { rand.nextBytes(bytes); Assert.assertEquals( - hasher.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize), - hasher.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize)); + HiveHasher.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize), + HiveHasher.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize)); - hashcodes.add(hasher.hashUnsafeBytes( + hashcodes.add(HiveHasher.hashUnsafeBytes( bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize)); } @@ -115,10 +115,10 @@ public void randomizedStressTestPaddedStrings() { System.arraycopy(strBytes, 0, paddedBytes, 0, strBytes.length); Assert.assertEquals( - hasher.hashUnsafeBytes(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize), - hasher.hashUnsafeBytes(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize)); + HiveHasher.hashUnsafeBytes(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize), + HiveHasher.hashUnsafeBytes(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize)); - hashcodes.add(hasher.hashUnsafeBytes( + hashcodes.add(HiveHasher.hashUnsafeBytes( paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize)); } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala index c6a1a2be0d071..85a82e233c3c6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala @@ -76,29 +76,43 @@ object HashBenchmark { } } + val getHiveHashCode = UnsafeProjection.create(new HiveHash(attrs) :: Nil, attrs) + benchmark.addCase("codegen HiveHash version") { _: Int => + for (_ <- 0L until iters) { + var sum = 0 + var i = 0 + while (i < numRows) { + sum += getHiveHashCode(rows(i)).getInt(0) + i += 1 + } + } + } + benchmark.run() } def main(args: Array[String]): Unit = { val singleInt = new StructType().add("i", IntegerType) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash For single ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - interpreted version 1006 / 1011 133.4 7.5 1.0X - codegen version 1835 / 1839 73.1 13.7 0.5X - codegen version 64-bit 1627 / 1628 82.5 12.1 0.6X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash For single ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + interpreted version 3012 / 3168 178.2 5.6 1.0X + codegen version 6204 / 6509 86.5 11.6 0.5X + codegen version 64-bit 5666 / 5741 94.7 10.6 0.5X + codegen HiveHash version 4666 / 4691 115.1 8.7 0.6X */ test("single ints", singleInt, 1 << 15, 1 << 14) val singleLong = new StructType().add("i", LongType) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash For single longs: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - interpreted version 1196 / 1209 112.2 8.9 1.0X - codegen version 2178 / 2181 61.6 16.2 0.5X - codegen version 64-bit 1752 / 1753 76.6 13.1 0.7X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash For single longs: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + interpreted version 3610 / 3634 148.7 6.7 1.0X + codegen version 7302 / 7325 73.5 13.6 0.5X + codegen version 64-bit 7193 / 7226 74.6 13.4 0.5X + codegen HiveHash version 4769 / 5446 112.6 8.9 0.8X */ test("single longs", singleLong, 1 << 15, 1 << 14) @@ -118,13 +132,14 @@ object HashBenchmark { .add("date", DateType) .add("timestamp", TimestampType) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash For normal: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - interpreted version 2713 / 2715 0.8 1293.5 1.0X - codegen version 2015 / 2018 1.0 960.9 1.3X - codegen version 64-bit 735 / 738 2.9 350.7 3.7X - */ + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash For normal: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + interpreted version 2934 / 2944 0.7 1399.2 1.0X + codegen version 3513 / 3559 0.6 1675.2 0.8X + codegen version 64-bit 1063 / 1181 2.0 506.7 2.8X + codegen HiveHash version 5183 / 5367 0.4 2471.6 0.6X + */ test("normal", normal, 1 << 10, 1 << 11) val arrayOfInt = ArrayType(IntegerType) @@ -132,13 +147,14 @@ object HashBenchmark { .add("array", arrayOfInt) .add("arrayOfArray", ArrayType(arrayOfInt)) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash For array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - interpreted version 1498 / 1499 0.1 11432.1 1.0X - codegen version 2642 / 2643 0.0 20158.4 0.6X - codegen version 64-bit 2421 / 2424 0.1 18472.5 0.6X - */ + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash For array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + interpreted version 2697 / 2705 0.0 20573.9 1.0X + codegen version 4630 / 4650 0.0 35325.8 0.6X + codegen version 64-bit 4398 / 4697 0.0 33556.4 0.6X + codegen HiveHash version 2385 / 2606 0.1 18197.4 1.1X + */ test("array", array, 1 << 8, 1 << 9) val mapOfInt = MapType(IntegerType, IntegerType) @@ -146,13 +162,14 @@ object HashBenchmark { .add("map", mapOfInt) .add("mapOfMap", MapType(IntegerType, mapOfInt)) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash For map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - interpreted version 1612 / 1618 0.0 393553.4 1.0X - codegen version 149 / 150 0.0 36381.2 10.8X - codegen version 64-bit 144 / 145 0.0 35122.1 11.2X - */ + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash For map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + interpreted version 0 / 0 78.6 12.7 1.0X + codegen version 239 / 251 0.0 58310.4 0.0X + codegen version 64-bit 212 / 231 0.0 51649.1 0.0X + codegen HiveHash version 80 / 88 0.1 19531.8 0.0X + */ test("map", map, 1 << 6, 1 << 6) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala index 53f21a8442429..35ebf64eb6bf6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql import java.util.Random -import org.apache.spark.sql.catalyst.expressions.XXH64 +import org.apache.spark.sql.catalyst.expressions.{HiveHasher, XXH64} import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.hash.Murmur3_x86_32 import org.apache.spark.util.Benchmark @@ -59,90 +59,110 @@ object HashByteArrayBenchmark { } } + benchmark.addCase("HiveHasher") { _: Int => + for (_ <- 0L until iters) { + var sum = 0L + var i = 0 + while (i < numArrays) { + sum += HiveHasher.hashUnsafeBytes(arrays(i), Platform.BYTE_ARRAY_OFFSET, length) + i += 1 + } + } + } + benchmark.run() } def main(args: Array[String]): Unit = { /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash byte arrays with length 8: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Murmur3_x86_32 11 / 12 185.1 5.4 1.0X - xxHash 64-bit 17 / 18 120.0 8.3 0.6X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash byte arrays with length 8: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Murmur3_x86_32 11 / 12 198.9 5.0 1.0X + xxHash 64-bit 16 / 19 130.1 7.7 0.7X + HiveHasher 0 / 0 282254.6 0.0 1419.0X */ test(8, 42L, 1 << 10, 1 << 11) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash byte arrays with length 16: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Murmur3_x86_32 18 / 18 118.6 8.4 1.0X - xxHash 64-bit 20 / 21 102.5 9.8 0.9X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash byte arrays with length 16: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Murmur3_x86_32 18 / 19 119.7 8.4 1.0X + xxHash 64-bit 19 / 21 109.9 9.1 0.9X + HiveHasher 0 / 0 281308.1 0.0 2349.8X */ test(16, 42L, 1 << 10, 1 << 11) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash byte arrays with length 24: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Murmur3_x86_32 24 / 24 86.6 11.5 1.0X - xxHash 64-bit 23 / 23 93.2 10.7 1.1X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash byte arrays with length 24: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Murmur3_x86_32 25 / 26 83.5 12.0 1.0X + xxHash 64-bit 22 / 23 95.9 10.4 1.1X + HiveHasher 0 / 0 281345.9 0.0 3367.5X */ test(24, 42L, 1 << 10, 1 << 11) // Add 31 to all arrays to create worse case alignment for xxHash. /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash byte arrays with length 31: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Murmur3_x86_32 38 / 39 54.7 18.3 1.0X - xxHash 64-bit 33 / 33 64.4 15.5 1.2X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash byte arrays with length 31: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Murmur3_x86_32 37 / 38 57.0 17.5 1.0X + xxHash 64-bit 32 / 33 65.8 15.2 1.2X + HiveHasher 0 / 0 281761.7 0.0 4941.4X */ test(31, 42L, 1 << 10, 1 << 11) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash byte arrays with length 95: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Murmur3_x86_32 91 / 94 22.9 43.6 1.0X - xxHash 64-bit 68 / 69 30.6 32.7 1.3X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash byte arrays with length 95: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Murmur3_x86_32 91 / 100 23.0 43.5 1.0X + xxHash 64-bit 68 / 71 31.0 32.3 1.3X + HiveHasher 0 / 0 281761.7 0.0 12256.3X */ test(64 + 31, 42L, 1 << 10, 1 << 11) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash byte arrays with length 287: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Murmur3_x86_32 268 / 268 7.8 127.6 1.0X - xxHash 64-bit 108 / 109 19.4 51.6 2.5X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash byte arrays with length 287: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Murmur3_x86_32 265 / 272 7.9 126.3 1.0X + xxHash 64-bit 107 / 114 19.7 50.8 2.5X + HiveHasher 0 / 0 281837.4 0.0 35592.6X */ test(256 + 31, 42L, 1 << 10, 1 << 11) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash byte arrays with length 1055: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Murmur3_x86_32 942 / 945 2.2 449.4 1.0X - xxHash 64-bit 276 / 276 7.6 131.4 3.4X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash byte arrays with length 1055: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Murmur3_x86_32 941 / 959 2.2 448.7 1.0X + xxHash 64-bit 266 / 278 7.9 126.8 3.5X + HiveHasher 0 / 0 282292.6 0.0 126654.5X */ test(1024 + 31, 42L, 1 << 10, 1 << 11) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash byte arrays with length 2079: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Murmur3_x86_32 1839 / 1843 1.1 876.8 1.0X - xxHash 64-bit 445 / 448 4.7 212.1 4.1X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash byte arrays with length 2079: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Murmur3_x86_32 1912 / 1918 1.1 911.6 1.0X + xxHash 64-bit 463 / 503 4.5 220.7 4.1X + HiveHasher 0 / 0 281610.3 0.0 256709.1X */ test(2048 + 31, 42L, 1 << 10, 1 << 11) /* - Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz - Hash byte arrays with length 8223: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Murmur3_x86_32 7307 / 7310 0.3 3484.4 1.0X - xxHash 64-bit 1487 / 1488 1.4 709.1 4.9X - */ + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Hash byte arrays with length 8223: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Murmur3_x86_32 10061 / 10102 0.2 4797.5 1.0X + xxHash 64-bit 2115 / 2221 1.0 1008.4 4.8X + HiveHasher 0 / 0 281044.2 0.0 1348297.1X + */ test(8192 + 31, 42L, 1 << 10, 1 << 11) } } From 238dbb80ead789229dd4022a320addedef648757 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Wed, 28 Sep 2016 13:32:09 -0700 Subject: [PATCH 4/4] fix benchmark --- .../org/apache/spark/sql/HashBenchmark.scala | 52 ++++++++-------- .../spark/sql/HashByteArrayBenchmark.scala | 60 +++++++++---------- 2 files changed, 56 insertions(+), 56 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala index 85a82e233c3c6..2d94b66a1e122 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala @@ -42,8 +42,8 @@ object HashBenchmark { val benchmark = new Benchmark("Hash For " + name, iters * numRows) benchmark.addCase("interpreted version") { _: Int => + var sum = 0 for (_ <- 0L until iters) { - var sum = 0 var i = 0 while (i < numRows) { sum += rows(i).hashCode() @@ -54,8 +54,8 @@ object HashBenchmark { val getHashCode = UnsafeProjection.create(new Murmur3Hash(attrs) :: Nil, attrs) benchmark.addCase("codegen version") { _: Int => + var sum = 0 for (_ <- 0L until iters) { - var sum = 0 var i = 0 while (i < numRows) { sum += getHashCode(rows(i)).getInt(0) @@ -66,8 +66,8 @@ object HashBenchmark { val getHashCode64b = UnsafeProjection.create(new XxHash64(attrs) :: Nil, attrs) benchmark.addCase("codegen version 64-bit") { _: Int => + var sum = 0 for (_ <- 0L until iters) { - var sum = 0 var i = 0 while (i < numRows) { sum += getHashCode64b(rows(i)).getInt(0) @@ -78,8 +78,8 @@ object HashBenchmark { val getHiveHashCode = UnsafeProjection.create(new HiveHash(attrs) :: Nil, attrs) benchmark.addCase("codegen HiveHash version") { _: Int => + var sum = 0 for (_ <- 0L until iters) { - var sum = 0 var i = 0 while (i < numRows) { sum += getHiveHashCode(rows(i)).getInt(0) @@ -97,11 +97,11 @@ object HashBenchmark { Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz Hash For single ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - interpreted version 3012 / 3168 178.2 5.6 1.0X - codegen version 6204 / 6509 86.5 11.6 0.5X - codegen version 64-bit 5666 / 5741 94.7 10.6 0.5X - codegen HiveHash version 4666 / 4691 115.1 8.7 0.6X - */ + interpreted version 3262 / 3267 164.6 6.1 1.0X + codegen version 6448 / 6718 83.3 12.0 0.5X + codegen version 64-bit 6088 / 6154 88.2 11.3 0.5X + codegen HiveHash version 4732 / 4745 113.5 8.8 0.7X + */ test("single ints", singleInt, 1 << 15, 1 << 14) val singleLong = new StructType().add("i", LongType) @@ -109,11 +109,11 @@ object HashBenchmark { Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz Hash For single longs: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - interpreted version 3610 / 3634 148.7 6.7 1.0X - codegen version 7302 / 7325 73.5 13.6 0.5X - codegen version 64-bit 7193 / 7226 74.6 13.4 0.5X - codegen HiveHash version 4769 / 5446 112.6 8.9 0.8X - */ + interpreted version 3716 / 3726 144.5 6.9 1.0X + codegen version 7706 / 7732 69.7 14.4 0.5X + codegen version 64-bit 6370 / 6399 84.3 11.9 0.6X + codegen HiveHash version 4924 / 5026 109.0 9.2 0.8X + */ test("single longs", singleLong, 1 << 15, 1 << 14) val normal = new StructType() @@ -135,10 +135,10 @@ object HashBenchmark { Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz Hash For normal: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - interpreted version 2934 / 2944 0.7 1399.2 1.0X - codegen version 3513 / 3559 0.6 1675.2 0.8X - codegen version 64-bit 1063 / 1181 2.0 506.7 2.8X - codegen HiveHash version 5183 / 5367 0.4 2471.6 0.6X + interpreted version 2985 / 3013 0.7 1423.4 1.0X + codegen version 2422 / 2434 0.9 1155.1 1.2X + codegen version 64-bit 856 / 920 2.5 408.0 3.5X + codegen HiveHash version 4501 / 4979 0.5 2146.4 0.7X */ test("normal", normal, 1 << 10, 1 << 11) @@ -150,10 +150,10 @@ object HashBenchmark { Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz Hash For array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - interpreted version 2697 / 2705 0.0 20573.9 1.0X - codegen version 4630 / 4650 0.0 35325.8 0.6X - codegen version 64-bit 4398 / 4697 0.0 33556.4 0.6X - codegen HiveHash version 2385 / 2606 0.1 18197.4 1.1X + interpreted version 3100 / 3555 0.0 23651.8 1.0X + codegen version 5779 / 5865 0.0 44088.4 0.5X + codegen version 64-bit 4738 / 4821 0.0 36151.7 0.7X + codegen HiveHash version 2200 / 2246 0.1 16785.9 1.4X */ test("array", array, 1 << 8, 1 << 9) @@ -165,10 +165,10 @@ object HashBenchmark { Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz Hash For map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - interpreted version 0 / 0 78.6 12.7 1.0X - codegen version 239 / 251 0.0 58310.4 0.0X - codegen version 64-bit 212 / 231 0.0 51649.1 0.0X - codegen HiveHash version 80 / 88 0.1 19531.8 0.0X + interpreted version 0 / 0 48.1 20.8 1.0X + codegen version 257 / 275 0.0 62768.7 0.0X + codegen version 64-bit 226 / 240 0.0 55224.5 0.0X + codegen HiveHash version 89 / 96 0.0 21708.8 0.0X */ test("map", map, 1 << 6, 1 << 6) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala index 35ebf64eb6bf6..2a753a0c84ed5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala @@ -38,8 +38,8 @@ object HashByteArrayBenchmark { val benchmark = new Benchmark("Hash byte arrays with length " + length, iters * numArrays) benchmark.addCase("Murmur3_x86_32") { _: Int => + var sum = 0L for (_ <- 0L until iters) { - var sum = 0 var i = 0 while (i < numArrays) { sum += Murmur3_x86_32.hashUnsafeBytes(arrays(i), Platform.BYTE_ARRAY_OFFSET, length, 42) @@ -49,8 +49,8 @@ object HashByteArrayBenchmark { } benchmark.addCase("xxHash 64-bit") { _: Int => + var sum = 0L for (_ <- 0L until iters) { - var sum = 0L var i = 0 while (i < numArrays) { sum += XXH64.hashUnsafeBytes(arrays(i), Platform.BYTE_ARRAY_OFFSET, length, 42) @@ -60,8 +60,8 @@ object HashByteArrayBenchmark { } benchmark.addCase("HiveHasher") { _: Int => + var sum = 0L for (_ <- 0L until iters) { - var sum = 0L var i = 0 while (i < numArrays) { sum += HiveHasher.hashUnsafeBytes(arrays(i), Platform.BYTE_ARRAY_OFFSET, length) @@ -78,9 +78,9 @@ object HashByteArrayBenchmark { Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz Hash byte arrays with length 8: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - Murmur3_x86_32 11 / 12 198.9 5.0 1.0X - xxHash 64-bit 16 / 19 130.1 7.7 0.7X - HiveHasher 0 / 0 282254.6 0.0 1419.0X + Murmur3_x86_32 12 / 16 174.3 5.7 1.0X + xxHash 64-bit 17 / 22 120.0 8.3 0.7X + HiveHasher 13 / 15 162.1 6.2 0.9X */ test(8, 42L, 1 << 10, 1 << 11) @@ -88,9 +88,9 @@ object HashByteArrayBenchmark { Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz Hash byte arrays with length 16: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - Murmur3_x86_32 18 / 19 119.7 8.4 1.0X - xxHash 64-bit 19 / 21 109.9 9.1 0.9X - HiveHasher 0 / 0 281308.1 0.0 2349.8X + Murmur3_x86_32 19 / 22 107.6 9.3 1.0X + xxHash 64-bit 20 / 24 104.6 9.6 1.0X + HiveHasher 24 / 28 87.0 11.5 0.8X */ test(16, 42L, 1 << 10, 1 << 11) @@ -98,9 +98,9 @@ object HashByteArrayBenchmark { Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz Hash byte arrays with length 24: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - Murmur3_x86_32 25 / 26 83.5 12.0 1.0X - xxHash 64-bit 22 / 23 95.9 10.4 1.1X - HiveHasher 0 / 0 281345.9 0.0 3367.5X + Murmur3_x86_32 28 / 32 74.8 13.4 1.0X + xxHash 64-bit 24 / 29 87.3 11.5 1.2X + HiveHasher 36 / 41 57.7 17.3 0.8X */ test(24, 42L, 1 << 10, 1 << 11) @@ -109,9 +109,9 @@ object HashByteArrayBenchmark { Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz Hash byte arrays with length 31: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - Murmur3_x86_32 37 / 38 57.0 17.5 1.0X - xxHash 64-bit 32 / 33 65.8 15.2 1.2X - HiveHasher 0 / 0 281761.7 0.0 4941.4X + Murmur3_x86_32 41 / 45 51.1 19.6 1.0X + xxHash 64-bit 36 / 44 58.8 17.0 1.2X + HiveHasher 49 / 54 42.6 23.5 0.8X */ test(31, 42L, 1 << 10, 1 << 11) @@ -119,9 +119,9 @@ object HashByteArrayBenchmark { Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz Hash byte arrays with length 95: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - Murmur3_x86_32 91 / 100 23.0 43.5 1.0X - xxHash 64-bit 68 / 71 31.0 32.3 1.3X - HiveHasher 0 / 0 281761.7 0.0 12256.3X + Murmur3_x86_32 100 / 110 21.0 47.7 1.0X + xxHash 64-bit 74 / 78 28.2 35.5 1.3X + HiveHasher 189 / 196 11.1 90.3 0.5X */ test(64 + 31, 42L, 1 << 10, 1 << 11) @@ -129,9 +129,9 @@ object HashByteArrayBenchmark { Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz Hash byte arrays with length 287: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - Murmur3_x86_32 265 / 272 7.9 126.3 1.0X - xxHash 64-bit 107 / 114 19.7 50.8 2.5X - HiveHasher 0 / 0 281837.4 0.0 35592.6X + Murmur3_x86_32 299 / 311 7.0 142.4 1.0X + xxHash 64-bit 113 / 122 18.5 54.1 2.6X + HiveHasher 620 / 624 3.4 295.5 0.5X */ test(256 + 31, 42L, 1 << 10, 1 << 11) @@ -139,9 +139,9 @@ object HashByteArrayBenchmark { Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz Hash byte arrays with length 1055: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - Murmur3_x86_32 941 / 959 2.2 448.7 1.0X - xxHash 64-bit 266 / 278 7.9 126.8 3.5X - HiveHasher 0 / 0 282292.6 0.0 126654.5X + Murmur3_x86_32 1068 / 1070 2.0 509.1 1.0X + xxHash 64-bit 306 / 315 6.9 145.9 3.5X + HiveHasher 2316 / 2369 0.9 1104.3 0.5X */ test(1024 + 31, 42L, 1 << 10, 1 << 11) @@ -149,9 +149,9 @@ object HashByteArrayBenchmark { Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz Hash byte arrays with length 2079: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - Murmur3_x86_32 1912 / 1918 1.1 911.6 1.0X - xxHash 64-bit 463 / 503 4.5 220.7 4.1X - HiveHasher 0 / 0 281610.3 0.0 256709.1X + Murmur3_x86_32 2252 / 2274 0.9 1074.1 1.0X + xxHash 64-bit 534 / 580 3.9 254.6 4.2X + HiveHasher 4739 / 4786 0.4 2259.8 0.5X */ test(2048 + 31, 42L, 1 << 10, 1 << 11) @@ -159,9 +159,9 @@ object HashByteArrayBenchmark { Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz Hash byte arrays with length 8223: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - Murmur3_x86_32 10061 / 10102 0.2 4797.5 1.0X - xxHash 64-bit 2115 / 2221 1.0 1008.4 4.8X - HiveHasher 0 / 0 281044.2 0.0 1348297.1X + Murmur3_x86_32 9249 / 9586 0.2 4410.5 1.0X + xxHash 64-bit 2897 / 3241 0.7 1381.6 3.2X + HiveHasher 19392 / 20211 0.1 9246.6 0.5X */ test(8192 + 31, 42L, 1 << 10, 1 << 11) }