diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index 3459c21b4..98b234a79 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -27,9 +27,21 @@ object IndexConstants { val INDEX_SEARCH_PATHS = "spark.hyperspace.index.search.paths" val INDEX_NUM_BUCKETS = "spark.hyperspace.index.num.buckets" + // This config enables Hybrid scan on mutable dataset at query time. + // Currently, this config allows to perform Hybrid scan on append-only dataset. + // For delete dataset, "spark.hyperspace.index.hybridscan.delete.enabled" is + // also needed to be set. val INDEX_HYBRID_SCAN_ENABLED = "spark.hyperspace.index.hybridscan.enabled" val INDEX_HYBRID_SCAN_ENABLED_DEFAULT = "false" + // This is a temporary config to support Hybrid scan on both append & delete dataset. + // The config does not work without the Hybrid scan config + // "spark.hyperspace.index.hybridscan.enabled" + // and will be removed after performance validation and optimization. + // See https://github.com/microsoft/hyperspace/issues/184 + val INDEX_HYBRID_SCAN_DELETE_ENABLED = "spark.hyperspace.index.hybridscan.delete.enabled" + val INDEX_HYBRID_SCAN_DELETE_ENABLED_DEFAULT = "false" + // Identifier injected to HadoopFsRelation as an option if an index is applied. // Currently, the identifier is added to options field of HadoopFsRelation. // In Spark 3.0, we could utilize TreeNodeTag to mark the identifier for each plan. diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala index 9b163179c..070f71cfd 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala @@ -121,7 +121,7 @@ object FilterIndexRule // Get candidate via file-level metadata validation. This is performed after pruning // by column schema, as this might be expensive when there are numerous files in the // relation or many indexes to be checked. - RuleUtils.getCandidateIndexes(candidateIndexes, r, HyperspaceConf.hybridScanEnabled(spark)) + RuleUtils.getCandidateIndexes(spark, candidateIndexes, r) case None => Nil // There is zero or more than one LogicalRelation nodes in Filter's subplan } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala index 89cae377e..1726092f2 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala @@ -316,13 +316,13 @@ object JoinIndexRule // by column schema, as this might be expensive when there are numerous files in the // relation or many indexes to be checked. val lIndexes = RuleUtils.getCandidateIndexes( + spark, lUsable, - RuleUtils.getLogicalRelation(left).get, - HyperspaceConf.hybridScanEnabled(spark)) + RuleUtils.getLogicalRelation(left).get) val rIndexes = RuleUtils.getCandidateIndexes( + spark, rUsable, - RuleUtils.getLogicalRelation(right).get, - HyperspaceConf.hybridScanEnabled(spark)) + RuleUtils.getLogicalRelation(right).get) val compatibleIndexPairs = getCompatibleIndexPairs(lIndexes, rIndexes, lRMap) diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index 9dfff71ca..aeb7eb4ed 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -21,11 +21,11 @@ import scala.collection.mutable import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, In, Literal, NamedExpression, Not} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, RepartitionByExpression, Union} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation, PartitioningAwareFileIndex} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StringType, StructType} import com.microsoft.hyperspace.index.{FileInfo, IndexConstants, IndexLogEntry, LogicalPlanSignatureProvider} import com.microsoft.hyperspace.index.plans.logical.BucketUnion @@ -40,18 +40,21 @@ object RuleUtils { * index source files and the input files of the given plan. If there are some common * files, the index is considered as a candidate. * + * @param spark Spark Session. * @param indexes List of available indexes. * @param plan Logical plan. - * @param hybridScanEnabled Flag that checks if hybrid scan is enabled or disabled. * @return Active indexes built for this plan. */ def getCandidateIndexes( + spark: SparkSession, indexes: Seq[IndexLogEntry], - plan: LogicalPlan, - hybridScanEnabled: Boolean): Seq[IndexLogEntry] = { + plan: LogicalPlan): Seq[IndexLogEntry] = { // Map of a signature provider to a signature generated for the given plan. val signatureMap = mutable.Map[String, Option[String]]() + val hybridScanEnabled = HyperspaceConf.hybridScanEnabled(spark) + val hybridScanDeleteEnabled = HyperspaceConf.hybridScanDeleteEnabled(spark) + def signatureValid(entry: IndexLogEntry): Boolean = { val sourcePlanSignatures = entry.source.plan.properties.fingerprint.properties.signatures assert(sourcePlanSignatures.length == 1) @@ -74,13 +77,16 @@ object RuleUtils { // support arbitrary source plans at index creation. // See https://github.com/microsoft/hyperspace/issues/158 - // Find the number of common files and deleted files between the source relations - // & index source files. + // Find the number of common files between the source relations & index source files. val commonCnt = inputSourceFiles.count(entry.allSourceFileInfos.contains) - val deletedCnt = entry.allSourceFileInfos.size - commonCnt - // Currently, Hybrid Scan only support for append-only dataset. - deletedCnt == 0 && commonCnt > 0 + if (hybridScanDeleteEnabled && entry.hasLineageColumn(spark)) { + commonCnt > 0 + } else { + // For append-only Hybrid Scan, deleted files are not allowed. + val deletedCnt = entry.allSourceFileInfos.size - commonCnt + deletedCnt == 0 && commonCnt > 0 + } } if (hybridScanEnabled) { @@ -140,7 +146,7 @@ object RuleUtils { * * Pre-requisites * - We know for sure the index which can be used to transform the plan. - * - The plan should be linear and include 1 LogicalRelation. + * - The plan should be linear and include one LogicalRelation. * * @param spark Spark session. * @param index Index used in transformation of plan. @@ -153,6 +159,9 @@ object RuleUtils { index: IndexLogEntry, plan: LogicalPlan, useBucketSpec: Boolean): LogicalPlan = { + // Check pre-requisite. + assert(getLogicalRelation(plan).isDefined) + val transformed = if (HyperspaceConf.hybridScanEnabled(spark)) { transformPlanToUseHybridScan(spark, index, plan, useBucketSpec) } else { @@ -225,7 +234,11 @@ object RuleUtils { var unhandledAppendedFiles: Seq[Path] = Nil // Get transformed plan with index data and appended files if applicable. - val indexPlan = plan transformDown { + val indexPlan = plan transformUp { + // Use transformUp here as currently one Logical Relation is allowed (pre-requisite). + // The transformed plan will have LogicalRelation as a child; for example, LogicalRelation + // can be transformed to 'Project -> Filter -> Logical Relation'. Thus, with transformDown, + // it will be matched again and transformed recursively which causes stack overflow exception. case baseRelation @ LogicalRelation( _ @HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), baseOutput, @@ -233,17 +246,35 @@ object RuleUtils { _) => val curFileSet = location.allFiles .map(f => FileInfo(f.getPath.toString, f.getLen, f.getModificationTime)) - val filesAppended = - curFileSet.filterNot(index.allSourceFileInfos.contains).map(f => new Path(f.name)) - // TODO: Hybrid Scan delete support. + + val (filesDeleted, filesAppended) = + if (HyperspaceConf.hybridScanDeleteEnabled(spark) && index.hasLineageColumn(spark)) { + val (exist, nonExist) = curFileSet.partition(index.allSourceFileInfos.contains) + val filesAppended = nonExist.map(f => new Path(f.name)) + if (exist.length < index.allSourceFileInfos.size) { + (index.allSourceFileInfos -- exist, filesAppended) + } else { + (Nil, filesAppended) + } + } else { + // Append-only implementation of getting appended files for efficiency. + // It is guaranteed that there is no deleted files via the condition + // 'deletedCnt == 0 && commonCnt > 0' in isHybridScanCandidate function. + ( + Nil, + curFileSet.filterNot(index.allSourceFileInfos.contains).map(f => new Path(f.name))) + } val filesToRead = { - if (useBucketSpec || !isParquetSourceFormat) { + if (useBucketSpec || !isParquetSourceFormat || filesDeleted.nonEmpty) { // Since the index data is in "parquet" format, we cannot read source files - // in formats other than "parquet" using 1 FileScan node as the operator requires + // in formats other than "parquet" using one FileScan node as the operator requires // files in one homogenous format. To address this, we need to read the appended // source files using another FileScan node injected into the plan and subsequently // merge the data into the index data. Please refer below [[Union]] operation. + // In case there are both deleted and appended files, we cannot handle the appended + // files along with deleted files as source files do not have the lineage column which + // is required for excluding the index data from deleted files. unhandledAppendedFiles = filesAppended index.content.files } else { @@ -255,18 +286,35 @@ object RuleUtils { } } + // In order to handle deleted files, read index data with the lineage column so that + // we could inject Filter-Not-In conditions on the lineage column to exclude the indexed + // rows from the deleted files. + val newSchema = StructType( + index.schema.filter(s => + baseRelation.schema.contains(s) || (filesDeleted.nonEmpty && s.name.equals( + IndexConstants.DATA_FILE_NAME_COLUMN)))) + val newLocation = new InMemoryFileIndex(spark, filesToRead, Map(), None) val relation = HadoopFsRelation( newLocation, new StructType(), - StructType(index.schema.filter(baseRelation.schema.contains(_))), + newSchema, if (useBucketSpec) Some(index.bucketSpec) else None, new ParquetFileFormat, Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark) val updatedOutput = baseOutput.filter(attr => relation.schema.fieldNames.contains(attr.name)) - baseRelation.copy(relation = relation, output = updatedOutput) + + if (filesDeleted.isEmpty) { + baseRelation.copy(relation = relation, output = updatedOutput) + } else { + val lineageAttr = AttributeReference(IndexConstants.DATA_FILE_NAME_COLUMN, StringType)() + val deletedFileNames = filesDeleted.map(f => Literal(f.name)).toArray + val rel = + baseRelation.copy(relation = relation, output = updatedOutput ++ Seq(lineageAttr)) + Project(updatedOutput, Filter(Not(In(lineageAttr, deletedFileNames)), rel)) + } } if (unhandledAppendedFiles.nonEmpty) { @@ -333,8 +381,7 @@ object RuleUtils { location = newLocation, dataSchema = StructType(indexSchema.filter(baseRelation.schema.contains(_))), options = - fsRelation.options + IndexConstants.INDEX_RELATION_IDENTIFIER)( - spark) + fsRelation.options + IndexConstants.INDEX_RELATION_IDENTIFIER)(spark) baseRelation.copy(relation = newRelation, output = updatedOutput) } assert(!originalPlan.equals(planForAppended)) @@ -345,7 +392,7 @@ object RuleUtils { * Transform the plan to perform on-the-fly Shuffle the data based on bucketSpec. * * Pre-requisite - * - The plan should be linear and include 1 LogicalRelation. + * - The plan should be linear and include one LogicalRelation. * * @param bucketSpec Bucket specification used for Shuffle. * @param plan Plan to be shuffled. diff --git a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala index ccf4b172b..655391b91 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala @@ -32,6 +32,14 @@ object HyperspaceConf { .toBoolean } + def hybridScanDeleteEnabled(spark: SparkSession): Boolean = { + spark.conf + .get( + IndexConstants.INDEX_HYBRID_SCAN_DELETE_ENABLED, + IndexConstants.INDEX_HYBRID_SCAN_DELETE_ENABLED_DEFAULT) + .toBoolean + } + def refreshDeleteEnabled(spark: SparkSession): Boolean = { spark.conf .get(IndexConstants.REFRESH_DELETE_ENABLED, diff --git a/src/test/scala/com/microsoft/hyperspace/index/HybridScanTest.scala b/src/test/scala/com/microsoft/hyperspace/index/HybridScanTest.scala index 6e0aa57a4..82e1af9a7 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/HybridScanTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/HybridScanTest.scala @@ -19,9 +19,9 @@ package com.microsoft.hyperspace.index import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.sql.{DataFrame, QueryTest} -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, In, Literal, Not} import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project, RepartitionByExpression, Union} -import org.apache.spark.sql.execution.{FileSourceScanExec, ProjectExec, UnionExec} +import org.apache.spark.sql.execution.{FileSourceScanExec, InputAdapter, ProjectExec, UnionExec, WholeStageCodegenExec} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.internal.SQLConf @@ -39,15 +39,24 @@ class HybridScanTest extends QueryTest with HyperspaceSuite { private val sampleDataLocationRoot = "src/test/resources/data/" private val sampleParquetDataLocationAppend = sampleDataLocationRoot + "sampleparquet0" private val sampleParquetDataLocationAppend2 = sampleDataLocationRoot + "sampleparquet1" + private val sampleParquetDataLocationDelete = sampleDataLocationRoot + "sampleparquet2" + private val sampleParquetDataLocationDelete2 = sampleDataLocationRoot + "sampleparquet3" + private val sampleParquetDataLocationDelete3 = sampleDataLocationRoot + "sampleparquet4" + private val sampleParquetDataLocationBoth = sampleDataLocationRoot + "sampleparquet5" private val sampleJsonDataLocationAppend = sampleDataLocationRoot + "samplejson1" + private val sampleJsonDataLocationDelete = sampleDataLocationRoot + "samplejson2" private var hyperspace: Hyperspace = _ - // Creates an index with given 'df' and 'indexConfig' and copies the first 'appendCnt' - // number of input files from 'df'. - def setupIndexAndAppendData(df: DataFrame, indexConfig: IndexConfig, appendCnt: Int): Unit = { + // Creates an index with given 'df' and 'indexConfig'. Then copies the first 'appendCnt' + // number of input files from 'df' and deletes the last 'deleteCnt' of the input files. + def setupIndexAndChangeData( + df: DataFrame, + indexConfig: IndexConfig, + appendCnt: Int, + deleteCnt: Int): Unit = { hyperspace.createIndex(df, indexConfig) val inputFiles = df.inputFiles - assert(appendCnt < inputFiles.length) + assert(appendCnt + deleteCnt < inputFiles.length) val fs = systemPath.getFileSystem(new Configuration) for (i <- 0 until appendCnt) { @@ -55,6 +64,9 @@ class HybridScanTest extends QueryTest with HyperspaceSuite { val destPath = new Path(inputFiles(i) + ".copy") fs.copyToLocalFile(sourcePath, destPath) } + for (i <- 1 to deleteCnt) { + fs.delete(new Path(inputFiles(inputFiles.length - i)), false) + } } override def beforeAll(): Unit = { @@ -65,22 +77,54 @@ class HybridScanTest extends QueryTest with HyperspaceSuite { val dfFromSample = sampleData.toDF("Date", "RGUID", "Query", "imprs", "clicks") dfFromSample.write.parquet(sampleParquetDataLocationAppend) dfFromSample.write.parquet(sampleParquetDataLocationAppend2) + dfFromSample.write.parquet(sampleParquetDataLocationDelete) + dfFromSample.write.parquet(sampleParquetDataLocationDelete2) + dfFromSample.write.parquet(sampleParquetDataLocationDelete3) + dfFromSample.write.parquet(sampleParquetDataLocationBoth) dfFromSample.write.json(sampleJsonDataLocationAppend) + dfFromSample.write.json(sampleJsonDataLocationDelete) + + val indexConfig1 = IndexConfig("indexType1", Seq("clicks"), Seq("query")) + val indexConfig2 = IndexConfig("indexType2", Seq("clicks"), Seq("Date")) - val indexConfig1 = IndexConfig("index1", Seq("clicks"), Seq("query")) - val indexConfig2 = IndexConfig("index11", Seq("clicks"), Seq("Date")) - setupIndexAndAppendData( + setupIndexAndChangeData( spark.read.parquet(sampleParquetDataLocationAppend), - indexConfig1, - appendCnt = 1) - setupIndexAndAppendData( + indexConfig1.copy(indexName = "index_ParquetAppend"), + appendCnt = 1, + deleteCnt = 0) + setupIndexAndChangeData( spark.read.parquet(sampleParquetDataLocationAppend2), - indexConfig2, - appendCnt = 1) - setupIndexAndAppendData( + indexConfig2.copy(indexName = "indexType2_ParquetAppend2"), + appendCnt = 1, + deleteCnt = 0) + setupIndexAndChangeData( spark.read.json(sampleJsonDataLocationAppend), - indexConfig1.copy(indexName = "index4"), - appendCnt = 1) + indexConfig1.copy(indexName = "index_JsonAppend"), + appendCnt = 1, + deleteCnt = 0) + + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { + setupIndexAndChangeData( + spark.read.parquet(sampleParquetDataLocationDelete), + indexConfig1.copy(indexName = "index_ParquetDelete"), + appendCnt = 0, + deleteCnt = 2) + setupIndexAndChangeData( + spark.read.parquet(sampleParquetDataLocationDelete3), + indexConfig2.copy(indexName = "indexType2_ParquetDelete3"), + appendCnt = 0, + deleteCnt = 2) + setupIndexAndChangeData( + spark.read.parquet(sampleParquetDataLocationBoth), + indexConfig1.copy(indexName = "index_ParquetBoth"), + appendCnt = 1, + deleteCnt = 1) + setupIndexAndChangeData( + spark.read.json(sampleJsonDataLocationDelete), + indexConfig1.copy(indexName = "index_JsonDelete"), + appendCnt = 0, + deleteCnt = 2) + } } before { @@ -120,7 +164,7 @@ class HybridScanTest extends QueryTest with HyperspaceSuite { // Verify appended file is included or not. assert(fsRelation.location.inputFiles.count(_.contains(".copy")) === 1) // Verify number of index data files. - assert(fsRelation.location.inputFiles.count(_.contains("index1")) === 4) + assert(fsRelation.location.inputFiles.count(_.contains("index_ParquetAppend")) === 4) assert(fsRelation.location.inputFiles.length === 5) p } @@ -158,7 +202,7 @@ class HybridScanTest extends QueryTest with HyperspaceSuite { case b @ BucketUnion(children, bucketSpec) => assert(bucketSpec.numBuckets === 200) assert( - bucketSpec.bucketColumnNames.size == 1 && bucketSpec.bucketColumnNames.head + bucketSpec.bucketColumnNames.size === 1 && bucketSpec.bucketColumnNames.head .equals("clicks")) val childNodes = children.collect { @@ -166,7 +210,7 @@ class HybridScanTest extends QueryTest with HyperspaceSuite { attrs, Project(_, Filter(_, LogicalRelation(fsRelation: HadoopFsRelation, _, _, _))), numBucket) => - assert(attrs.size == 1) + assert(attrs.size === 1) assert(attrs.head.asInstanceOf[Attribute].name.contains("clicks")) // Check 1 appended file. assert(fsRelation.location.inputFiles.forall(_.contains(".copy"))) @@ -177,15 +221,18 @@ class HybridScanTest extends QueryTest with HyperspaceSuite { _, Filter(_, LogicalRelation(fsRelation: HadoopFsRelation, _, _, _))) => // Check 4 of index data files. - assert(fsRelation.location.inputFiles.forall(_.contains("index"))) + assert( + fsRelation.location.inputFiles.forall(_.contains("index_ParquetAppend")) + || fsRelation.location.inputFiles + .forall(_.contains("indexType2_ParquetAppend2"))) assert(fsRelation.location.inputFiles.length === 4) p } // BucketUnion has 2 children. assert(childNodes.size === 2) - assert(childNodes.count(_.isInstanceOf[Project]) == 1) - assert(childNodes.count(_.isInstanceOf[RepartitionByExpression]) == 1) + assert(childNodes.count(_.isInstanceOf[Project]) === 1) + assert(childNodes.count(_.isInstanceOf[RepartitionByExpression]) === 1) b } // 2 BucketUnion in Join Rule v1. @@ -247,7 +294,7 @@ class HybridScanTest extends QueryTest with HyperspaceSuite { fileFormatName match { case "parquet" => // Check 4 of index data files. - assert(fsRelation.location.inputFiles.forall(_.contains("index"))) + assert(fsRelation.location.inputFiles.forall(_.contains("index_JsonAppend"))) assert(fsRelation.location.inputFiles.length === 4) case "json" => // Check 1 appended file. @@ -291,4 +338,362 @@ class HybridScanTest extends QueryTest with HyperspaceSuite { } } } + + test( + "Delete-only: filter index & parquet format, " + + "Hybrid Scan for delete support doesn't work without lineage column") { + // Setup index without lineage column. + val indexConfig = IndexConfig("index_ParquetDelete2", Seq("clicks"), Seq("query")) + Seq( + ("indexNameWithoutLineage", "false", false), + ("indexNameWithLineage", "true", true)) foreach { + case (indexName, lineageColumnConfig, transformationExpected) => + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> lineageColumnConfig) { + setupIndexAndChangeData( + spark.read.parquet(sampleParquetDataLocationDelete2), + indexConfig.copy(indexName = indexName), + appendCnt = 0, + deleteCnt = 1) + + val df = spark.read.parquet(sampleParquetDataLocationDelete2) + def filterQuery: DataFrame = + df.filter(df("clicks") <= 2000).select(df("query")) + val baseQuery = filterQuery + withSQLConf( + "spark.hyperspace.index.hybridscan.enabled" -> "true", + "spark.hyperspace.index.hybridscan.delete.enabled" -> "false") { + val filter = filterQuery + assert( + baseQuery.queryExecution.optimizedPlan.equals(filter.queryExecution.optimizedPlan)) + } + withSQLConf( + "spark.hyperspace.index.hybridscan.enabled" -> "true", + "spark.hyperspace.index.hybridscan.delete.enabled" -> "true") { + val filter = filterQuery + assert( + baseQuery.queryExecution.optimizedPlan + .equals(filter.queryExecution.optimizedPlan) + .equals(!transformationExpected)) + } + } + } + } + + test( + "Delete-only: filter index & parquet, json format, " + + "index relation should have additional filter for deleted files") { + Seq( + (sampleParquetDataLocationDelete, "index_ParquetDelete", "parquet"), + (sampleJsonDataLocationDelete, "index_JsonDelete", "json")) foreach { + case (dataPath, indexName, dataFormat) => + val df = spark.read.format(dataFormat).load(dataPath) + def filterQuery: DataFrame = + df.filter(df("clicks") <= 2000).select(df("query")) + val baseQuery = filterQuery + + withSQLConf( + "spark.hyperspace.index.hybridscan.enabled" -> "true", + "spark.hyperspace.index.hybridscan.delete.enabled" -> "false") { + val filter = filterQuery + assert( + baseQuery.queryExecution.optimizedPlan.equals(filter.queryExecution.optimizedPlan)) + } + + withSQLConf( + "spark.hyperspace.index.hybridscan.enabled" -> "true", + "spark.hyperspace.index.hybridscan.delete.enabled" -> "true") { + val filter = filterQuery + val planWithHybridScan = filter.queryExecution.optimizedPlan + assert(!baseQuery.queryExecution.optimizedPlan.equals(planWithHybridScan)) + + val deletedFilesList = planWithHybridScan collect { + case Filter( + Not(In(attr, deletedFileNames)), + LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) => + // Check new filter condition on lineage column. + assert(attr.toString.contains(IndexConstants.DATA_FILE_NAME_COLUMN)) + val deleted = deletedFileNames.map(_.toString) + assert(deleted.length === 2) + assert(deleted.distinct.length === deleted.length) + assert(deleted.forall(f => !df.inputFiles.contains(f))) + // Check the location is replaced with index data files properly. + assert(fsRelation.location.inputFiles.forall(_.contains(indexName))) + assert(fsRelation.location.inputFiles.length === 4) + deleted + } + assert(deletedFilesList.length === 1) + val deletedFiles = deletedFilesList.flatten + assert(deletedFiles.length === 2) + + val execPlan = spark.sessionState.executePlan(planWithHybridScan).executedPlan + val execNodes = execPlan collect { + case p @ FileSourceScanExec(_, _, _, _, _, dataFilters, _) => + // Check filter pushed down properly. + val filterStr = dataFilters.toString + assert(filterStr.contains(" <= 2000)")) + // Check deleted files. + assert(deletedFiles.forall(filterStr.contains)) + p + } + assert(execNodes.length === 1) + + checkAnswer(baseQuery, filter) + } + } + } + + test("Delete-only: join index, deleted files should be excluded from each index relation.") { + val df1 = spark.read.parquet(sampleParquetDataLocationDelete) + val df2 = spark.read.parquet(sampleParquetDataLocationDelete3) + def joinQuery(): DataFrame = { + val query = df1.filter(df1("clicks") >= 2000).select(df1("clicks"), df1("query")) + val query2 = df2.filter(df2("clicks") <= 4000).select(df2("clicks"), df2("Date")) + query.join(query2, "clicks") + } + val baseQuery = joinQuery + + withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "-1") { + withSQLConf("spark.hyperspace.index.hybridscan.enabled" -> "false") { + val join = joinQuery() + assert(join.queryExecution.optimizedPlan.equals(baseQuery.queryExecution.optimizedPlan)) + } + + withSQLConf( + "spark.hyperspace.index.hybridscan.enabled" -> "true", + "spark.hyperspace.index.hybridscan.delete.enabled" -> "true") { + val join = joinQuery() + val planWithHybridScan = join.queryExecution.optimizedPlan + assert(!baseQuery.queryExecution.optimizedPlan.equals(planWithHybridScan)) + + val deletedFilesList = planWithHybridScan collect { + case Filter( + Not(In(attr, deletedFileNames)), + LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) => + // Check new filter condition on lineage column. + assert(attr.toString.contains(IndexConstants.DATA_FILE_NAME_COLUMN)) + val deleted = deletedFileNames.map(_.toString) + assert(deleted.length === 2) + assert(deleted.distinct.length === deleted.length) + assert(deleted.forall(f => !df1.inputFiles.contains(f))) + assert(deleted.forall(f => !df2.inputFiles.contains(f))) + assert( + fsRelation.location.inputFiles.forall(_.contains("index_ParquetDelete")) || + fsRelation.location.inputFiles.forall(_.contains("indexType2_ParquetDelete3"))) + assert(fsRelation.location.inputFiles.length === 4) + deleted + } + assert(deletedFilesList.length === 2) + var deletedFiles = deletedFilesList.flatten + assert(deletedFiles.length === 4) + + val execPlan = spark.sessionState.executePlan(planWithHybridScan).executedPlan + val execNodes = execPlan collect { + case p @ FileSourceScanExec(_, _, _, _, _, dataFilters, _) => + // Check filter pushed down properly. + val filterStr = dataFilters.toString + assert(filterStr.contains(" >= 2000)") && filterStr.contains(" <= 4000)")) + // Check deleted files in the push down condition and remove from the list. + deletedFiles = deletedFiles.filterNot(filterStr.contains(_)) + p + } + assert(deletedFiles.isEmpty) + assert(execNodes.count(_.isInstanceOf[FileSourceScanExec]) === 2) + + checkAnswer(join, baseQuery) + } + } + } + + test( + "Append+Delete: filter index & parquet format, " + + "appended files should be handled with additional plan and merged by Union.") { + val df = spark.read.parquet(sampleParquetDataLocationBoth) + def filterQuery: DataFrame = + df.filter(df("clicks") <= 2000).select(df("query")) + val baseQuery = filterQuery + + withSQLConf("spark.hyperspace.index.hybridscan.enabled" -> "false") { + val filter = filterQuery + assert(baseQuery.queryExecution.optimizedPlan.equals(filter.queryExecution.optimizedPlan)) + } + + withSQLConf( + "spark.hyperspace.index.hybridscan.enabled" -> "true", + "spark.hyperspace.index.hybridscan.delete.enabled" -> "true") { + val filter = filterQuery + val planWithHybridScan = filter.queryExecution.optimizedPlan + assert(!baseQuery.queryExecution.optimizedPlan.equals(planWithHybridScan)) + + var deletedFilesList: Seq[Seq[String]] = Nil + val nodes = planWithHybridScan collect { + case p @ Filter( + Not(In(attr, deletedFileNames)), + LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) => + // Check new filter condition on lineage column. + assert(attr.toString.contains(IndexConstants.DATA_FILE_NAME_COLUMN)) + val deleted = deletedFileNames.map(_.toString) + assert(deleted.length === 1) + assert(deleted.distinct.length === deleted.length) + assert(deleted.forall(f => !df.inputFiles.contains(f))) + assert(fsRelation.location.inputFiles.forall(_.contains("index_ParquetBoth"))) + assert(fsRelation.location.inputFiles.length === 4) + deletedFilesList = deletedFilesList :+ deleted + p + case p: Union => + p + case p @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => + if (fsRelation.location.inputFiles.exists(_.contains(".copy"))) { + // Check input files for appended files. + assert(fsRelation.location.inputFiles.length === 1) + } else { + // Check input files for index data files. + assert(fsRelation.location.inputFiles.forall(_.contains("index_ParquetBoth"))) + assert(fsRelation.location.inputFiles.length === 4) + } + p + } + assert(nodes.count(_.isInstanceOf[Filter]) === 1) + assert(nodes.count(_.isInstanceOf[Union]) === 1) + assert(nodes.count(_.isInstanceOf[LogicalRelation]) === 2) + + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + val execPlan = spark.sessionState.executePlan(planWithHybridScan).executedPlan + var deletePushDownFilterFound = false + val execNodes = execPlan collect { + case p @ UnionExec(children) => + assert(children.size === 2) + assert(children.head.isInstanceOf[ProjectExec]) // index data + assert(children.last.isInstanceOf[ProjectExec]) // appended data + p + case p @ FileSourceScanExec(_, _, _, _, _, dataFilters, _) => + // Check filter pushed down properly. + val filterStr = dataFilters.toString + assert(filterStr.contains(" <= 2000)")) + if (filterStr.contains(IndexConstants.DATA_FILE_NAME_COLUMN)) { + assert(deletedFilesList.flatten.forall(filterStr.contains(_))) + assert(!deletePushDownFilterFound) + deletePushDownFilterFound = true + } + p + case _: ShuffleExchangeExec => + // Make sure there is no shuffle. + fail("ShuffleExchangeExec node found") + } + + assert(execNodes.count(_.isInstanceOf[UnionExec]) === 1) + // 1 of index, 1 of appended file + assert(execNodes.count(_.isInstanceOf[FileSourceScanExec]) === 2) + assert(deletePushDownFilterFound) + + checkAnswer(baseQuery, filter) + } + } + } + + test( + "Append+Delete: join index, appended data should be shuffled with indexed columns " + + "and merged by BucketUnion and deleted files are handled with index data.") { + // One relation has both deleted & appended files and the other one has only deleted files. + val df1 = spark.read.parquet(sampleParquetDataLocationBoth) + val df2 = spark.read.parquet(sampleParquetDataLocationDelete3) + def joinQuery(): DataFrame = { + val query = df1.filter(df1("clicks") >= 2000).select(df1("clicks"), df1("query")) + val query2 = df2.filter(df2("clicks") <= 4000).select(df2("clicks"), df2("Date")) + query.join(query2, "clicks") + } + val baseQuery = joinQuery + + withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "-1") { + withSQLConf("spark.hyperspace.index.hybridscan.enabled" -> "false") { + val join = joinQuery() + assert(join.queryExecution.optimizedPlan.equals(baseQuery.queryExecution.optimizedPlan)) + } + + withSQLConf( + "spark.hyperspace.index.hybridscan.enabled" -> "true", + "spark.hyperspace.index.hybridscan.delete.enabled" -> "true") { + val join = joinQuery() + val planWithHybridScan = join.queryExecution.optimizedPlan + assert(!baseQuery.queryExecution.optimizedPlan.equals(planWithHybridScan)) + + var deletedFilesList: Seq[Seq[String]] = Nil + val nodes = planWithHybridScan collect { + case p @ Filter( + Not(In(attr, deletedFileNames)), + LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) => + // Check new filter condition on lineage column. + assert(attr.toString.contains(IndexConstants.DATA_FILE_NAME_COLUMN)) + assert(deletedFileNames.length === 1 || deletedFileNames.length === 2) + val deleted = deletedFileNames.map(_.toString) + assert(deleted.distinct.length === deleted.length) + assert(deleted.forall(f => !df1.inputFiles.contains(f))) + assert(deleted.forall(f => !df2.inputFiles.contains(f))) + assert( + fsRelation.location.inputFiles.forall(_.contains("index_ParquetBoth")) || + fsRelation.location.inputFiles.forall(_.contains("indexType2_ParquetDelete3"))) + assert(fsRelation.location.inputFiles.length === 4) + deletedFilesList = deletedFilesList :+ deleted + p + case p @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => + val appendedFileCnt = fsRelation.location.inputFiles.count(_.contains(".copy")) + val indexFileCnt = fsRelation.location.inputFiles.count(_.contains("index")) + if (appendedFileCnt > 0) { + assert(appendedFileCnt === 1) + assert(fsRelation.location.inputFiles.length === 1) + } else { + assert( + fsRelation.location.inputFiles.forall(_.contains("index_ParquetBoth")) || + fsRelation.location.inputFiles.forall(_.contains("indexType2_ParquetDelete3"))) + assert(indexFileCnt === 4) + assert(fsRelation.location.inputFiles.length === 4) + } + p + case p: BucketUnion => p + } + // 2 LogicalRelation for index with append+delete, 1 for index with delete. + assert(nodes.count(_.isInstanceOf[LogicalRelation]) === 3) + // 1 BucketUnion for index with append+delete. + assert(nodes.count(_.isInstanceOf[BucketUnion]) === 1) + // 2 Filter-Not-In nodes for index with delete. + assert(nodes.count(_.isInstanceOf[Filter]) === 2) + assert(deletedFilesList.length === 2) + var deletedFiles = deletedFilesList.flatten + assert(deletedFiles.length === 3) + + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + val execPlan = spark.sessionState.executePlan(planWithHybridScan).executedPlan + var deleteFilesPushDownFilterCnt = 0 + val execNodes = execPlan collect { + case p @ BucketUnionExec(children, bucketSpec) => + assert(children.size === 2) + // children.head is always the index plan. + assert(children.head.isInstanceOf[ProjectExec]) // index data + assert(children.last.isInstanceOf[ShuffleExchangeExec]) // appended data + assert(bucketSpec.bucketColumnNames.length === 1) + assert(bucketSpec.bucketColumnNames.head.equals("clicks")) + assert(bucketSpec.numBuckets === 200) + p + case p @ FileSourceScanExec(_, _, _, _, _, dataFilters, _) => + // Check filter pushed down properly. + val filterStr = dataFilters.toString + assert(filterStr.contains(" >= 2000)") && filterStr.contains(" <= 4000)")) + // Check deleted files. + if (filterStr.contains(IndexConstants.DATA_FILE_NAME_COLUMN)) { + deletedFiles = deletedFiles.filterNot(filterStr.contains(_)) + deleteFilesPushDownFilterCnt += 1 + } + p + } + // Check all deleted files are present in Push-down filter condition. + assert(deletedFiles.isEmpty) + assert(execNodes.count(_.isInstanceOf[BucketUnionExec]) === 1) + // 2 of index data, 1 of appended file. + assert(execNodes.count(_.isInstanceOf[FileSourceScanExec]) === 3) + assert(deleteFilesPushDownFilterCnt === 2) + + checkAnswer(baseQuery, join) + } + } + } + } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala index f87d22bf8..757592ca2 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFil import org.apache.spark.sql.types.{IntegerType, StringType} import com.microsoft.hyperspace.actions.Constants -import com.microsoft.hyperspace.index.{IndexCollectionManager, IndexConfig} +import com.microsoft.hyperspace.index.{IndexCollectionManager, IndexConfig, IndexConstants} import com.microsoft.hyperspace.util.{FileUtils, PathUtils} class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { @@ -90,14 +90,14 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { val indexManager = IndexCollectionManager(spark) val allIndexes = indexManager.getIndexes(Seq(Constants.States.ACTIVE)) - assert(RuleUtils.getCandidateIndexes(allIndexes, t1ProjectNode, false).length === 3) - assert(RuleUtils.getCandidateIndexes(allIndexes, t2ProjectNode, false).length === 2) + assert(RuleUtils.getCandidateIndexes(spark, allIndexes, t1ProjectNode).length === 3) + assert(RuleUtils.getCandidateIndexes(spark, allIndexes, t2ProjectNode).length === 2) // Delete an index for t1ProjectNode indexManager.delete("t1i1") val allIndexes2 = indexManager.getIndexes(Seq(Constants.States.ACTIVE)) - assert(RuleUtils.getCandidateIndexes(allIndexes2, t1ProjectNode, false).length === 2) + assert(RuleUtils.getCandidateIndexes(spark, allIndexes2, t1ProjectNode).length === 2) } test("Verify get logical relation for single logical relation node plan.") { @@ -123,21 +123,28 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { withIndex("index1") { val readDf = spark.read.parquet(dataPath) - val indexFile = readDf.inputFiles.head - indexManager.create(readDf, IndexConfig("index1", Seq("id"))) + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { + indexManager.create(readDf, IndexConfig("index1", Seq("id"))) + } val allIndexes = indexManager.getIndexes(Seq(Constants.States.ACTIVE)) def verify( plan: LogicalPlan, hybridScanEnabled: Boolean, + hybridScanDeleteEnabled: Boolean, expectCandidateIndex: Boolean): Unit = { - val indexes = RuleUtils - .getCandidateIndexes(allIndexes, plan, hybridScanEnabled) - if (expectCandidateIndex) { - assert(indexes.length == 1) - assert(indexes.head.name == "index1") - } else { - assert(indexes.isEmpty) + withSQLConf( + "spark.hyperspace.index.hybridscan.enabled" -> hybridScanEnabled.toString, + "spark.hyperspace.index.hybridscan.delete.enabled" -> + hybridScanDeleteEnabled.toString) { + val indexes = RuleUtils + .getCandidateIndexes(spark, allIndexes, plan) + if (expectCandidateIndex) { + assert(indexes.length == 1) + assert(indexes.head.name == "index1") + } else { + assert(indexes.isEmpty) + } } } @@ -145,8 +152,16 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { // hybrid scan is enabled or not. { val optimizedPlan = spark.read.parquet(dataPath).queryExecution.optimizedPlan - verify(optimizedPlan, hybridScanEnabled = false, expectCandidateIndex = true) - verify(optimizedPlan, hybridScanEnabled = true, expectCandidateIndex = true) + verify( + optimizedPlan, + hybridScanEnabled = false, + hybridScanDeleteEnabled = false, + expectCandidateIndex = true) + verify( + optimizedPlan, + hybridScanEnabled = true, + hybridScanDeleteEnabled = false, + expectCandidateIndex = true) } // Scenario #1: Append new files. @@ -154,20 +169,38 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { { val optimizedPlan = spark.read.parquet(dataPath).queryExecution.optimizedPlan - verify(optimizedPlan, hybridScanEnabled = false, expectCandidateIndex = false) - verify(optimizedPlan, hybridScanEnabled = true, expectCandidateIndex = true) + verify( + optimizedPlan, + hybridScanEnabled = false, + hybridScanDeleteEnabled = false, + expectCandidateIndex = false) + verify( + optimizedPlan, + hybridScanEnabled = true, + hybridScanDeleteEnabled = false, + expectCandidateIndex = true) } // Scenario #2: Delete 1 file. - { - FileUtils.delete(new Path(indexFile), isRecursive = false) - } + FileUtils.delete(new Path(readDf.inputFiles.head)) { val optimizedPlan = spark.read.parquet(dataPath).queryExecution.optimizedPlan - verify(optimizedPlan, hybridScanEnabled = false, expectCandidateIndex = false) - // TODO: expectedCandidateIndex = true once delete dataset is supported. - verify(optimizedPlan, hybridScanEnabled = true, expectCandidateIndex = false) + verify( + optimizedPlan, + hybridScanEnabled = false, + hybridScanDeleteEnabled = false, + expectCandidateIndex = false) + verify( + optimizedPlan, + hybridScanEnabled = true, + hybridScanDeleteEnabled = false, + expectCandidateIndex = false) + verify( + optimizedPlan, + hybridScanEnabled = true, + hybridScanDeleteEnabled = true, + expectCandidateIndex = true) } // Scenario #3: Replace all files. @@ -175,8 +208,16 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { { val optimizedPlan = spark.read.parquet(dataPath).queryExecution.optimizedPlan - verify(optimizedPlan, hybridScanEnabled = false, expectCandidateIndex = false) - verify(optimizedPlan, hybridScanEnabled = true, expectCandidateIndex = false) + verify( + optimizedPlan, + hybridScanEnabled = false, + hybridScanDeleteEnabled = false, + expectCandidateIndex = false) + verify( + optimizedPlan, + hybridScanEnabled = true, + hybridScanDeleteEnabled = true, + expectCandidateIndex = false) } } }