Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
import org.apache.spark.sql.types.StringType
import org.apache.spark.util.Utils

////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -459,6 +460,36 @@ object SQLConf {
.intConf
.createWithDefault(4096)

val IS_COLUMNAR_PARTITION_ADAPTIVE_ENABLED = buildConf("spark.sql.columnar.adaptiveFileSplit")
.doc("When this is enabled, the partition size will adaptively enlarged when reading " +
"from columnar storage to make sure the actual input bytes of each task is close to " +
"the a proper partition size. When a user set spark.sql.files.maxPartitionBytes, he " +
"may think that is the upper bound of a single partition and each task will read at " +
"most this amount of data. For row based files, that is right. But for columnar storage, " +
"such as Parquet OR ORC, each task may read much less data because of column pruning." +
"For example, a 1024 MB file may contains of 10 columns, 5 of which are integer while " +
"another 5 are long. If this is a row based file, there will be 8 tasks, each of which " +
"will read 128MB. If this is a Parquet or ORC file and the Job only read a single " +
"column in long, there will be also 8 tasks, but each task will read much less than " +
"128 MB because of column pruning.")
.booleanConf
.createWithDefault(false)

val COLUMNAR_STRUCT_LENGTH = buildConf("spark.sql.columnar.struct.length")
.doc("Set the default size of struct column")
.intConf
.createWithDefault(StringType.defaultSize)

val COLUMNAR_MAP_LENGTH = buildConf("spark.sql.columnar.map.length")
.doc("Set the default size of map column")
.intConf
.createWithDefault(StringType.defaultSize)

val COLUMNAR_ARRAY_LENGTH = buildConf("spark.sql.columnar.array.length")
.doc("Set the default size of array column")
.intConf
.createWithDefault(StringType.defaultSize)

val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
.doc("Sets the compression codec used when writing ORC files. If either `compression` or " +
"`orc.compress` is specified in the table-specific options/properties, the precedence " +
Expand Down Expand Up @@ -1714,6 +1745,15 @@ class SQLConf extends Serializable with Logging {

def parquetRecordFilterEnabled: Boolean = getConf(PARQUET_RECORD_FILTER_ENABLED)

def isColumnarStorageSplitSizeAdaptiveEnabled: Boolean =
getConf(IS_COLUMNAR_PARTITION_ADAPTIVE_ENABLED)

def columnarStructTypeLength: Int = getConf(COLUMNAR_STRUCT_LENGTH)

def columnarMapTypeLength: Int = getConf(COLUMNAR_MAP_LENGTH)

def columnarArrayTypeLength: Int = getConf(COLUMNAR_ARRAY_LENGTH)

def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)

def offHeapColumnVectorEnabled: Boolean = getConf(COLUMN_VECTOR_OFFHEAP_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.BitSet

Expand Down Expand Up @@ -425,12 +426,44 @@ case class FileSourceScanExec(
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
val defaultMaxSplitBytes =
fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
var openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism

val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
var maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

if(fsRelation.sparkSession.sessionState.conf.isColumnarStorageSplitSizeAdaptiveEnabled &&
(fsRelation.fileFormat.isInstanceOf[ParquetSource] ||
fsRelation.fileFormat.isInstanceOf[OrcFileFormat])) {
if (relation.dataSchema.map(_.dataType).forall(dataType =>
dataType.isInstanceOf[CalendarIntervalType] || dataType.isInstanceOf[StructType]
|| dataType.isInstanceOf[MapType] || dataType.isInstanceOf[NullType]
|| dataType.isInstanceOf[AtomicType] || dataType.isInstanceOf[ArrayType])) {

def getTypeLength(dataType: DataType): Int = {
if (dataType.isInstanceOf[StructType]) {
fsRelation.sparkSession.sessionState.conf.columnarStructTypeLength
} else if (dataType.isInstanceOf[ArrayType]) {
fsRelation.sparkSession.sessionState.conf.columnarArrayTypeLength
} else if (dataType.isInstanceOf[MapType]) {
fsRelation.sparkSession.sessionState.conf.columnarMapTypeLength
} else {
dataType.defaultSize
}
}

val selectedColumnSize = requiredSchema.map(_.dataType).map(getTypeLength(_))
.reduceOption(_ + _).getOrElse(StringType.defaultSize)
val totalColumnSize = relation.dataSchema.map(_.dataType).map(getTypeLength(_))
.reduceOption(_ + _).getOrElse(StringType.defaultSize)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type based estimation is very rough. This is still hard for end users to decide the initial size.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile The target of this change is not making users easy to set the partition size. Instead, when user set the partition size, this change will try its best to make sure the read size is close to the value that set by user. Without this change, when user set partition size to 128MB, the actual read size may be 1MB or even smaller because of column pruning.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think his point is that the estimation is super rough which I agree with .. I am less sure if we should go ahead or not partially by this reason as well.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I agree that the estimation is rough especially for complex type. For AtomicType, it works better. And at least it take column pruning into consideration.

val multiplier = totalColumnSize / selectedColumnSize
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems here you can only get the ratio of selected columns to total columns. The actual type sizes are not put into consideration.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are many data types. CalendarIntervalType StructType MapType NullType UserDefinedType AtomicType(TimestampType StringType HiveStringType BooleanType DateType BinaryType NumericType) ObjectType ArrayType. For AtomicType, the size is fixed to the defaultSize. For complex type, such as StructType, MapType, ArrayType, the size is mutable. So I make it configurable with default value. With the data type size, multiplier is not only the ratio of selected columns to total columns, but the total size of selected columns to total size of all columns.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya As defined in getTypeLength, user can define the complex types' length as per the data statistics. And the length for AtomicType can be determined by AtomicType.defaultSize. So the multiplier is the ratio of the total length of the selected columns to the total length of all columns.

def getTypeLength (dataType : DataType) : Int = {
if (dataType.isInstanceOf[StructType]) {
fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength
} else if (dataType.isInstanceOf[ArrayType]) {
fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength
} else if (dataType.isInstanceOf[MapType]) {
fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength
} else {
dataType.defaultSize
}
}

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Now it also support ORC. Please help to review

maxSplitBytes = maxSplitBytes * multiplier
openCostInBytes = openCostInBytes * multiplier
}
}


logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")

Expand Down