Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,21 @@ object IndexConstants {
val INDEX_SEARCH_PATHS = "spark.hyperspace.index.search.paths"
val INDEX_NUM_BUCKETS = "spark.hyperspace.index.num.buckets"

// This config enables Hybrid scan on mutable dataset at query time.
// Currently, this config allows to perform Hybrid scan on append-only dataset.
// For delete dataset, "spark.hyperspace.index.hybridscan.delete.enabled" is
// also needed to be set.
val INDEX_HYBRID_SCAN_ENABLED = "spark.hyperspace.index.hybridscan.enabled"
val INDEX_HYBRID_SCAN_ENABLED_DEFAULT = "false"

// This is a temporary config to support Hybrid scan on both append & delete dataset.
// The config does not work without the Hybrid scan config
// "spark.hyperspace.index.hybridscan.enabled"
// and will be removed after performance validation and optimization.
// See https://github.com/microsoft/hyperspace/issues/184
val INDEX_HYBRID_SCAN_DELETE_ENABLED = "spark.hyperspace.index.hybridscan.delete.enabled"
Comment thread
imback82 marked this conversation as resolved.
val INDEX_HYBRID_SCAN_DELETE_ENABLED_DEFAULT = "false"

// Identifier injected to HadoopFsRelation as an option if an index is applied.
// Currently, the identifier is added to options field of HadoopFsRelation.
// In Spark 3.0, we could utilize TreeNodeTag to mark the identifier for each plan.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ object FilterIndexRule
// Get candidate via file-level metadata validation. This is performed after pruning
// by column schema, as this might be expensive when there are numerous files in the
// relation or many indexes to be checked.
RuleUtils.getCandidateIndexes(candidateIndexes, r, HyperspaceConf.hybridScanEnabled(spark))
RuleUtils.getCandidateIndexes(spark, candidateIndexes, r)

case None => Nil // There is zero or more than one LogicalRelation nodes in Filter's subplan
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,13 +316,13 @@ object JoinIndexRule
// by column schema, as this might be expensive when there are numerous files in the
// relation or many indexes to be checked.
val lIndexes = RuleUtils.getCandidateIndexes(
spark,
lUsable,
RuleUtils.getLogicalRelation(left).get,
HyperspaceConf.hybridScanEnabled(spark))
RuleUtils.getLogicalRelation(left).get)
val rIndexes = RuleUtils.getCandidateIndexes(
spark,
rUsable,
RuleUtils.getLogicalRelation(right).get,
HyperspaceConf.hybridScanEnabled(spark))
RuleUtils.getLogicalRelation(right).get)

val compatibleIndexPairs = getCompatibleIndexPairs(lIndexes, rIndexes, lRMap)

Expand Down
91 changes: 69 additions & 22 deletions src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import scala.collection.mutable
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, In, Literal, NamedExpression, Not}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, RepartitionByExpression, Union}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation, PartitioningAwareFileIndex}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StringType, StructType}

import com.microsoft.hyperspace.index.{FileInfo, IndexConstants, IndexLogEntry, LogicalPlanSignatureProvider}
import com.microsoft.hyperspace.index.plans.logical.BucketUnion
Expand All @@ -40,18 +40,21 @@ object RuleUtils {
* index source files and the input files of the given plan. If there are some common
* files, the index is considered as a candidate.
*
* @param spark Spark Session.
* @param indexes List of available indexes.
* @param plan Logical plan.
* @param hybridScanEnabled Flag that checks if hybrid scan is enabled or disabled.
* @return Active indexes built for this plan.
*/
def getCandidateIndexes(
spark: SparkSession,
indexes: Seq[IndexLogEntry],
plan: LogicalPlan,
hybridScanEnabled: Boolean): Seq[IndexLogEntry] = {
plan: LogicalPlan): Seq[IndexLogEntry] = {
// Map of a signature provider to a signature generated for the given plan.
val signatureMap = mutable.Map[String, Option[String]]()

val hybridScanEnabled = HyperspaceConf.hybridScanEnabled(spark)
val hybridScanDeleteEnabled = HyperspaceConf.hybridScanDeleteEnabled(spark)

def signatureValid(entry: IndexLogEntry): Boolean = {
val sourcePlanSignatures = entry.source.plan.properties.fingerprint.properties.signatures
assert(sourcePlanSignatures.length == 1)
Expand All @@ -74,13 +77,16 @@ object RuleUtils {
// support arbitrary source plans at index creation.
// See https://github.com/microsoft/hyperspace/issues/158

// Find the number of common files and deleted files between the source relations
// & index source files.
// Find the number of common files between the source relations & index source files.
val commonCnt = inputSourceFiles.count(entry.allSourceFileInfos.contains)
val deletedCnt = entry.allSourceFileInfos.size - commonCnt

// Currently, Hybrid Scan only support for append-only dataset.
deletedCnt == 0 && commonCnt > 0
if (hybridScanDeleteEnabled && entry.hasLineageColumn(spark)) {
commonCnt > 0
} else {
// For append-only Hybrid Scan, deleted files are not allowed.
val deletedCnt = entry.allSourceFileInfos.size - commonCnt
deletedCnt == 0 && commonCnt > 0
}
}

if (hybridScanEnabled) {
Expand Down Expand Up @@ -140,7 +146,7 @@ object RuleUtils {
*
* Pre-requisites
* - We know for sure the index which can be used to transform the plan.
* - The plan should be linear and include 1 LogicalRelation.
* - The plan should be linear and include one LogicalRelation.
*
* @param spark Spark session.
* @param index Index used in transformation of plan.
Expand All @@ -153,6 +159,9 @@ object RuleUtils {
index: IndexLogEntry,
plan: LogicalPlan,
useBucketSpec: Boolean): LogicalPlan = {
// Check pre-requisite.
assert(getLogicalRelation(plan).isDefined)

val transformed = if (HyperspaceConf.hybridScanEnabled(spark)) {
transformPlanToUseHybridScan(spark, index, plan, useBucketSpec)
} else {
Expand Down Expand Up @@ -225,25 +234,47 @@ object RuleUtils {
var unhandledAppendedFiles: Seq[Path] = Nil

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we please move this variable closer to its usage? Also please add a description for this variable next to declaration?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's located here as it's updated inside the following indexPlan block and there are some comment around the usage.


// Get transformed plan with index data and appended files if applicable.
val indexPlan = plan transformDown {
val indexPlan = plan transformUp {
// Use transformUp here as currently one Logical Relation is allowed (pre-requisite).
// The transformed plan will have LogicalRelation as a child; for example, LogicalRelation
// can be transformed to 'Project -> Filter -> Logical Relation'. Thus, with transformDown,
// it will be matched again and transformed recursively which causes stack overflow exception.
case baseRelation @ LogicalRelation(
_ @HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _),
baseOutput,
_,
_) =>
val curFileSet = location.allFiles
.map(f => FileInfo(f.getPath.toString, f.getLen, f.getModificationTime))
val filesAppended =
curFileSet.filterNot(index.allSourceFileInfos.contains).map(f => new Path(f.name))
// TODO: Hybrid Scan delete support.

val (filesDeleted, filesAppended) =
if (HyperspaceConf.hybridScanDeleteEnabled(spark) && index.hasLineageColumn(spark)) {
val (exist, nonExist) = curFileSet.partition(index.allSourceFileInfos.contains)
val filesAppended = nonExist.map(f => new Path(f.name))
if (exist.length < index.allSourceFileInfos.size) {
(index.allSourceFileInfos -- exist, filesAppended)
} else {
(Nil, filesAppended)
}
Comment on lines +254 to +258

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we replace this section with just

(index.allSourceFileInfos -- exist, filesAppended)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the current impl. to avoid unnecessary comparisons.

} else {
// Append-only implementation of getting appended files for efficiency.
// It is guaranteed that there is no deleted files via the condition
// 'deletedCnt == 0 && commonCnt > 0' in isHybridScanCandidate function.
(
Nil,
curFileSet.filterNot(index.allSourceFileInfos.contains).map(f => new Path(f.name)))
}

val filesToRead = {
if (useBucketSpec || !isParquetSourceFormat) {
if (useBucketSpec || !isParquetSourceFormat || filesDeleted.nonEmpty) {
Comment thread
sezruby marked this conversation as resolved.
// Since the index data is in "parquet" format, we cannot read source files
// in formats other than "parquet" using 1 FileScan node as the operator requires
// in formats other than "parquet" using one FileScan node as the operator requires
// files in one homogenous format. To address this, we need to read the appended
// source files using another FileScan node injected into the plan and subsequently
// merge the data into the index data. Please refer below [[Union]] operation.
// In case there are both deleted and appended files, we cannot handle the appended
// files along with deleted files as source files do not have the lineage column which
// is required for excluding the index data from deleted files.
unhandledAppendedFiles = filesAppended
index.content.files
} else {
Expand All @@ -255,18 +286,35 @@ object RuleUtils {
}
}

// In order to handle deleted files, read index data with the lineage column so that
// we could inject Filter-Not-In conditions on the lineage column to exclude the indexed
// rows from the deleted files.
val newSchema = StructType(
index.schema.filter(s =>
baseRelation.schema.contains(s) || (filesDeleted.nonEmpty && s.name.equals(
IndexConstants.DATA_FILE_NAME_COLUMN))))

val newLocation = new InMemoryFileIndex(spark, filesToRead, Map(), None)
val relation = HadoopFsRelation(
newLocation,
new StructType(),
StructType(index.schema.filter(baseRelation.schema.contains(_))),
newSchema,
if (useBucketSpec) Some(index.bucketSpec) else None,
new ParquetFileFormat,
Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark)

val updatedOutput =
baseOutput.filter(attr => relation.schema.fieldNames.contains(attr.name))
baseRelation.copy(relation = relation, output = updatedOutput)

if (filesDeleted.isEmpty) {
baseRelation.copy(relation = relation, output = updatedOutput)
} else {
val lineageAttr = AttributeReference(IndexConstants.DATA_FILE_NAME_COLUMN, StringType)()
val deletedFileNames = filesDeleted.map(f => Literal(f.name)).toArray
val rel =
baseRelation.copy(relation = relation, output = updatedOutput ++ Seq(lineageAttr))
Project(updatedOutput, Filter(Not(In(lineageAttr, deletedFileNames)), rel))
}
}

if (unhandledAppendedFiles.nonEmpty) {
Expand Down Expand Up @@ -333,8 +381,7 @@ object RuleUtils {
location = newLocation,
dataSchema = StructType(indexSchema.filter(baseRelation.schema.contains(_))),
options =
fsRelation.options + IndexConstants.INDEX_RELATION_IDENTIFIER)(
spark)
fsRelation.options + IndexConstants.INDEX_RELATION_IDENTIFIER)(spark)
baseRelation.copy(relation = newRelation, output = updatedOutput)
}
assert(!originalPlan.equals(planForAppended))
Expand All @@ -345,7 +392,7 @@ object RuleUtils {
* Transform the plan to perform on-the-fly Shuffle the data based on bucketSpec.
*
* Pre-requisite
* - The plan should be linear and include 1 LogicalRelation.
* - The plan should be linear and include one LogicalRelation.
*
* @param bucketSpec Bucket specification used for Shuffle.
* @param plan Plan to be shuffled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ object HyperspaceConf {
.toBoolean
}

def hybridScanDeleteEnabled(spark: SparkSession): Boolean = {
spark.conf
.get(
IndexConstants.INDEX_HYBRID_SCAN_DELETE_ENABLED,
IndexConstants.INDEX_HYBRID_SCAN_DELETE_ENABLED_DEFAULT)
.toBoolean
}

def refreshDeleteEnabled(spark: SparkSession): Boolean = {
spark.conf
.get(IndexConstants.REFRESH_DELETE_ENABLED,
Expand Down
Loading