diff --git a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala index c811ab2e5..b46f0ad86 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala @@ -19,7 +19,8 @@ package com.microsoft.hyperspace.actions import org.apache.hadoop.fs.Path import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} -import org.apache.spark.sql.functions.input_file_name +import org.apache.spark.sql.expressions.UserDefinedFunction +import org.apache.spark.sql.functions.{input_file_name, udf} import org.apache.spark.sql.sources.DataSourceRegister import com.microsoft.hyperspace.HyperspaceException @@ -181,8 +182,24 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) val missingPartitionColumns = getPartitionColumns(df).filter( ResolverUtils.resolve(spark, _, columnsFromIndexConfig).isEmpty) val allIndexColumns = columnsFromIndexConfig ++ missingPartitionColumns + + // File path value in DATA_FILE_NAME_COLUMN column is stored as String. We normalize + // path values in this column by removing extra preceding `/` characters in the path + // and store it the same way paths are stored in Content in an IndexLogEntry instance. + // Normalizing path values in DATA_FILE_NAME_COLUMN column keeps path representation + // unified in Hyperspace (between index lineage and index metadata) and helps performance + // by avoiding the need to fix paths (i.e. removing extra `/` characters) each time + // value of DATA_FILE_NAME_COLUMN column is read. + // Here is an example of normalization: + // - Original raw path (output of input_file_name() udf, before normalization): + // + file:///C:/myGit/hyperspace-1/src/test/part-00003.snappy.parquet + // - Normalized path to be stored in DATA_FILE_NAME_COLUMN column: + // + file:/C:/myGit/hyperspace-1/src/test/part-00003.snappy.parquet + val absolutePath: UserDefinedFunction = udf( + (filePath: String) => PathUtils.makeAbsolute(filePath).toString) + df.select(allIndexColumns.head, allIndexColumns.tail: _*) - .withColumn(IndexConstants.DATA_FILE_NAME_COLUMN, input_file_name()) + .withColumn(IndexConstants.DATA_FILE_NAME_COLUMN, absolutePath(input_file_name())) } else { df.select(columnsFromIndexConfig.head, columnsFromIndexConfig.tail: _*) } diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshAction.scala index 6b0086cbb..547fa0dfa 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshAction.scala @@ -17,61 +17,24 @@ package com.microsoft.hyperspace.actions import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.types.{DataType, StructType} -import com.microsoft.hyperspace.HyperspaceException -import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, REFRESHING} import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshActionEvent} -// TODO: This class depends directly on LogEntry. This should be updated such that -// it works with IndexLogEntry only. (for example, this class can take in -// derivedDataset specific logic for refreshing). +/** + * The Index refresh action is used to perform a full rebuild of the index. + * Consequently, it ends up creating a new version of the index and involves + * a full scan of the underlying source data. + * + * @param spark SparkSession. + * @param logManager Index LogManager for index being refreshed. + * @param dataManager Index DataManager for index being refreshed. + */ class RefreshAction( spark: SparkSession, - final override protected val logManager: IndexLogManager, + logManager: IndexLogManager, dataManager: IndexDataManager) - extends CreateActionBase(dataManager) - with Action { - private lazy val previousLogEntry: LogEntry = { - logManager.getLog(baseId).getOrElse { - throw HyperspaceException("LogEntry must exist for refresh operation") - } - } - - private lazy val previousIndexLogEntry = previousLogEntry.asInstanceOf[IndexLogEntry] - - // Reconstruct a df from schema - private lazy val df = { - val rels = previousIndexLogEntry.relations - // Currently we only support to create an index on a LogicalRelation. - assert(rels.size == 1) - val dataSchema = DataType.fromJson(rels.head.dataSchemaJson).asInstanceOf[StructType] - spark.read - .schema(dataSchema) - .format(rels.head.fileFormat) - .options(rels.head.options) - .load(rels.head.rootPaths: _*) - } - - private lazy val indexConfig: IndexConfig = { - val ddColumns = previousIndexLogEntry.derivedDataset.properties.columns - IndexConfig(previousIndexLogEntry.name, ddColumns.indexed, ddColumns.included) - } - - final override def logEntry: LogEntry = getIndexLogEntry(spark, df, indexConfig, indexDataPath) - - final override val transientState: String = REFRESHING - - final override val finalState: String = ACTIVE - - final override def validate(): Unit = { - if (!previousIndexLogEntry.state.equalsIgnoreCase(ACTIVE)) { - throw HyperspaceException( - s"Refresh is only supported in $ACTIVE state. " + - s"Current index state is ${previousIndexLogEntry.state}") - } - } + extends RefreshActionBase(spark, logManager, dataManager) { final override def op(): Unit = { // TODO: The current implementation picks the number of buckets from session config. diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala new file mode 100644 index 000000000..cf8562055 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala @@ -0,0 +1,79 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.actions + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.{DataType, StructType} + +import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, REFRESHING} +import com.microsoft.hyperspace.index._ + +/** + * Base abstract class containing common code for different types of index refresh actions. + * + * @param spark SparkSession + * @param logManager Index LogManager for index being refreshed + * @param dataManager Index DataManager for index being refreshed + */ +// TODO: This class depends directly on LogEntry. This should be updated such that +// it works with IndexLogEntry only. (for example, this class can take in +// derivedDataset specific logic for refreshing). +private[actions] abstract class RefreshActionBase( + spark: SparkSession, + final override protected val logManager: IndexLogManager, + dataManager: IndexDataManager) + extends CreateActionBase(dataManager) + with Action { + private lazy val previousLogEntry: LogEntry = { + logManager.getLog(baseId).getOrElse { + throw HyperspaceException("LogEntry must exist for refresh operation") + } + } + + protected lazy val previousIndexLogEntry = previousLogEntry.asInstanceOf[IndexLogEntry] + + // Reconstruct a df from schema + protected lazy val df = { + val rels = previousIndexLogEntry.relations + val dataSchema = DataType.fromJson(rels.head.dataSchemaJson).asInstanceOf[StructType] + spark.read + .schema(dataSchema) + .format(rels.head.fileFormat) + .options(rels.head.options) + .load(rels.head.rootPaths: _*) + } + + protected lazy val indexConfig: IndexConfig = { + val ddColumns = previousIndexLogEntry.derivedDataset.properties.columns + IndexConfig(previousIndexLogEntry.name, ddColumns.indexed, ddColumns.included) + } + + final override def logEntry: LogEntry = getIndexLogEntry(spark, df, indexConfig, indexDataPath) + + final override val transientState: String = REFRESHING + + final override val finalState: String = ACTIVE + + override def validate(): Unit = { + if (!previousIndexLogEntry.state.equalsIgnoreCase(ACTIVE)) { + throw HyperspaceException( + s"Refresh is only supported in $ACTIVE state. " + + s"Current index state is ${previousIndexLogEntry.state}") + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala new file mode 100644 index 000000000..9668fc8d3 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala @@ -0,0 +1,125 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.actions + +import org.apache.hadoop.fs.Path +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions._ + +import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.index._ +import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer +import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshDeleteActionEvent} + +/** + * Refresh index by removing index entries from any deleted source data file. + * Note this Refresh Action only fixes an index w.r.t deleted source data files + * and does not consider new source data files (if any). + * If some original source data file(s) are removed between previous version of index and now, + * this Refresh Action updates the index as follows: + * 1. Deleted source data files are identified; + * 2. Index records' lineage is leveraged to remove any index entry coming + * from those deleted source data files. + * + * @param spark SparkSession + * @param logManager Index LogManager for index being refreshed + * @param dataManager Index DataManager for index being refreshed + */ +class RefreshDeleteAction( + spark: SparkSession, + logManager: IndexLogManager, + dataManager: IndexDataManager) + extends RefreshActionBase(spark, logManager, dataManager) + with Logging { + + final override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = { + RefreshDeleteActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message) + } + + /** + * Validate index has lineage column and it is in active state for refreshing and + * there are some deleted source data file(s). + */ + final override def validate(): Unit = { + super.validate() + if (!previousIndexLogEntry.hasLineageColumn(spark)) { + throw HyperspaceException( + "Index refresh (to handle deleted source data) is " + + "only supported on an index with lineage.") + } + + if (deletedFiles.isEmpty) { + throw HyperspaceException("Refresh aborted as no deleted source data file found.") + } + } + + /** + * For an index with lineage, find all the source data files which have been deleted, + * and use index records' lineage to mark and remove index entries which belong to + * deleted source data files as those entries are no longer valid. + */ + final override def op(): Unit = { + logInfo( + "Refresh index is updating index by removing index entries " + + s"corresponding to ${deletedFiles.length} deleted source data files.") + + val refreshDF = + spark.read + .parquet(previousIndexLogEntry.content.files.map(_.toString): _*) + .filter(!col(s"${IndexConstants.DATA_FILE_NAME_COLUMN}").isin(deletedFiles: _*)) + + refreshDF.write.saveWithBuckets( + refreshDF, + indexDataPath.toString, + previousIndexLogEntry.numBuckets, + indexConfig.indexedColumns) + } + + /** + * Compare list of source data files from previous IndexLogEntry to list + * of current source data files, validate fileInfo for existing files and + * identify deleted source data files. + */ + private lazy val deletedFiles: Seq[String] = { + val rels = previousIndexLogEntry.relations + val originalFiles = rels.head.data.properties.content.fileInfos + val currentFiles = rels.head.rootPaths + .flatMap { p => + Content + .fromDirectory(path = new Path(p), throwIfNotExists = true) + .fileInfos + } + .map(f => f.name -> f) + .toMap + + var delFiles = Seq[String]() + originalFiles.foreach { f => + currentFiles.get(f.name) match { + case Some(v) => + if (!f.equals(v)) { + throw HyperspaceException( + "Index refresh (to handle deleted source data) aborted. " + + s"Existing source data file info is changed (file: ${f.name}).") + } + case None => delFiles :+= f.name + } + } + + delFiles + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala index cf354c71d..6d3dc6493 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.internal.SQLConf import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.actions._ +import com.microsoft.hyperspace.util.HyperspaceConf class IndexCollectionManager( spark: SparkSession, @@ -66,7 +67,11 @@ class IndexCollectionManager( withLogManager(indexName) { logManager => val indexPath = PathResolver(spark.sessionState.conf).getIndexPath(indexName) val dataManager = indexDataManagerFactory.create(indexPath) - new RefreshAction(spark, logManager, dataManager).run() + if (HyperspaceConf.refreshDeleteEnabled(spark)) { + new RefreshDeleteAction(spark, logManager, dataManager).run() + } else { + new RefreshAction(spark, logManager, dataManager).run() + } } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index 91a5c929b..d1a163d49 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -54,4 +54,7 @@ object IndexConstants { private[hyperspace] val DATA_FILE_NAME_COLUMN = "_data_file_name" val INDEX_LINEAGE_ENABLED = "spark.hyperspace.index.lineage.enabled" val INDEX_LINEAGE_ENABLED_DEFAULT = "false" + + val REFRESH_DELETE_ENABLED = "spark.hyperspace.index.refresh.delete.enabled" + val REFRESH_DELETE_ENABLED_DEFAULT = "false" } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 9646db2d6..e99b10de1 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -24,11 +24,12 @@ import scala.collection.mutable.{HashMap, ListBuffer} import com.fasterxml.jackson.annotation.JsonIgnore import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{DataType, StructType} import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.actions.Constants -import com.microsoft.hyperspace.util.PathUtils +import com.microsoft.hyperspace.util.{PathUtils, ResolverUtils} // IndexLogEntry-specific fingerprint to be temporarily used where fingerprint is not defined. case class NoOpFingerprint() { @@ -402,6 +403,12 @@ case class IndexLogEntry( sourcePlanSignatures.head } + def hasLineageColumn(spark: SparkSession): Boolean = { + ResolverUtils + .resolve(spark, IndexConstants.DATA_FILE_NAME_COLUMN, schema.fieldNames) + .isDefined + } + override def hashCode(): Int = { config.hashCode + signature.hashCode + numBuckets.hashCode + content.hashCode } diff --git a/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala b/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala index b80c0a760..386d50d9d 100644 --- a/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala +++ b/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala @@ -105,9 +105,21 @@ case class RefreshActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: S case class CancelActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String) extends HyperspaceIndexCRUDEvent +/** + * Index Refresh Event for deleted source files. Emitted when refresh is called on an index + * with config flag set to remove index entries for deleted source data files. + * + * @param appInfo AppInfo for spark application. + * @param index Related index. + * @param message Message about event. + */ +case class RefreshDeleteActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String) + extends HyperspaceIndexCRUDEvent + /** * Index usage event. This event is emitted when an index is picked instead of original data * source by one of the hyperspace rules. + * * @param appInfo AppInfo for spark application. * @param indexes List of selected indexes for this plan. * @param planBeforeRule Original plan before application of indexes. diff --git a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala index 4f882d325..f2b338332 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala @@ -31,4 +31,11 @@ object HyperspaceConf { IndexConstants.INDEX_HYBRID_SCAN_ENABLED_DEFAULT) .toBoolean } + + def refreshDeleteEnabled(spark: SparkSession): Boolean = { + spark.conf + .get(IndexConstants.REFRESH_DELETE_ENABLED, + IndexConstants.REFRESH_DELETE_ENABLED_DEFAULT) + .toBoolean + } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/CreateIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/CreateIndexTests.scala index ddfdb3073..820b478e9 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/CreateIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/CreateIndexTests.scala @@ -43,8 +43,7 @@ class CreateIndexTests extends HyperspaceSuite with SQLHelper { override def beforeAll(): Unit = { super.beforeAll() - val sparkSession = spark - hyperspace = new Hyperspace(sparkSession) + hyperspace = new Hyperspace(spark) FileUtils.delete(new Path(testDir), true) val dataColumns = Seq("Date", "RGUID", "Query", "imprs", "clicks") diff --git a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala index 9d8f0410a..944cf3b42 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFil import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData} import com.microsoft.hyperspace.index.rules.{FilterIndexRule, JoinIndexRule} -import com.microsoft.hyperspace.util.PathUtils +import com.microsoft.hyperspace.util.{FileUtils, PathUtils} class E2EHyperspaceRulesTests extends HyperspaceSuite with SQLHelper { private val testDir = "src/test/resources/e2eTests/" @@ -404,6 +404,56 @@ class E2EHyperspaceRulesTests extends HyperspaceSuite with SQLHelper { assert(!spark.isHyperspaceEnabled()) } + test("Validate index usage after refresh with some source data file deleted.") { + withSQLConf( + IndexConstants.INDEX_LINEAGE_ENABLED -> "true", + IndexConstants.REFRESH_DELETE_ENABLED -> "true") { + + // Save a copy of source data files. + val location = testDir + "ixRefreshTest" + val dataPath = new Path(location, "*parquet") + val dataColumns = Seq("c1", "c2", "c3", "c4", "c5") + SampleData.save(spark, location, dataColumns) + + // Create index on original source data files. + val df = spark.read.parquet(location) + val indexConfig = IndexConfig("filterIndex", Seq("c3"), Seq("c1")) + hyperspace.createIndex(df, indexConfig) + + // Verify index usage for index version (v=0). + def query1(): DataFrame = + spark.read.parquet(location).filter("c3 == 'facebook'").select("c3", "c1") + + verifyIndexUsage(query1, getIndexFilesPath(indexConfig.indexName)) + + // Delete some source data file. + val dataFileNames = dataPath + .getFileSystem(new Configuration) + .globStatus(dataPath) + .map(_.getPath) + + assert(dataFileNames.nonEmpty) + val fileToDelete = dataFileNames.head + FileUtils.delete(fileToDelete) + + def query2(): DataFrame = + spark.read.parquet(location).filter("c3 == 'facebook'").select("c3", "c1") + + // Verify index is not used. + spark.enableHyperspace() + val planRootPaths = getAllRootPaths(query2().queryExecution.optimizedPlan) + spark.disableHyperspace() + assert(planRootPaths.equals(Seq(PathUtils.makeAbsolute(location)))) + + // Refresh the index to remove deleted source data file records from index. + hyperspace.refreshIndex(indexConfig.indexName) + + // Verify index usage on latest version of index (v=1) after refresh. + verifyIndexUsage(query2, getIndexFilesPath(indexConfig.indexName, 1)) + FileUtils.delete(dataPath) + } + } + /** * Check that if the query plan has the expected rootPaths. * @@ -434,10 +484,13 @@ class E2EHyperspaceRulesTests extends HyperspaceSuite with SQLHelper { }.flatten } - private def getIndexFilesPath(indexName: String): Seq[Path] = { - Content.fromDirectory(new Path( - systemPath, - s"$indexName/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=0")).files + private def getIndexFilesPath(indexName: String, version: Int = 0): Seq[Path] = { + Content + .fromDirectory( + new Path( + systemPath, + s"$indexName/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=$version")) + .files } /** diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala index 39ea24ab1..8ce970cbf 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala @@ -67,7 +67,7 @@ class IndexManagerTests extends HyperspaceSuite with SQLHelper { StructType(Seq(StructField("RGUID", StringType), StructField("Date", StringType))) if (enableLineage) { expectedSchema = expectedSchema.add( - StructField(IndexConstants.DATA_FILE_NAME_COLUMN, StringType, false)) + StructField(IndexConstants.DATA_FILE_NAME_COLUMN, StringType)) } val expected = new IndexSummary( indexConfig1.indexName, diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala new file mode 100644 index 000000000..90bd8453a --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -0,0 +1,236 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.{AnalysisException, QueryTest} + +import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, SampleData} +import com.microsoft.hyperspace.util.FileUtils + +/** + * Unit E2E test cases for RefreshIndex. + */ +class RefreshIndexTests extends QueryTest with HyperspaceSuite { + override val systemPath = new Path("src/test/resources/indexLocation") + private val testDir = "src/test/resources/RefreshIndexDeleteTests/" + private val nonPartitionedDataPath = testDir + "nonpartitioned" + private val partitionedDataPath = testDir + "partitioned" + private val indexConfig = IndexConfig("index1", Seq("Query"), Seq("imprs")) + private var hyperspace: Hyperspace = _ + + override def beforeAll(): Unit = { + super.beforeAll() + + hyperspace = new Hyperspace(spark) + FileUtils.delete(new Path(testDir)) + } + + override def afterAll(): Unit = { + FileUtils.delete(new Path(testDir)) + super.afterAll() + } + + after { + FileUtils.delete(new Path(testDir)) + FileUtils.delete(systemPath) + } + + test("Validate refresh index when some file gets deleted from the source data.") { + // Save test data non-partitioned. + SampleData.save( + spark, + nonPartitionedDataPath, + Seq("Date", "RGUID", "Query", "imprs", "clicks")) + val nonPartitionedDataDF = spark.read.parquet(nonPartitionedDataPath) + + // Save test data partitioned. + SampleData.save( + spark, + partitionedDataPath, + Seq("Date", "RGUID", "Query", "imprs", "clicks"), + Some(Seq("Date", "Query"))) + val partitionedDataDF = spark.read.parquet(partitionedDataPath) + + Seq(nonPartitionedDataPath, partitionedDataPath).foreach { loc => + withSQLConf( + IndexConstants.INDEX_LINEAGE_ENABLED -> "true", + IndexConstants.REFRESH_DELETE_ENABLED -> "true") { + withIndex(indexConfig.indexName) { + val dfToIndex = + if (loc.equals(nonPartitionedDataPath)) nonPartitionedDataDF else partitionedDataDF + hyperspace.createIndex(dfToIndex, indexConfig) + + // Delete one source data file. + val deletedFile = if (loc.equals(nonPartitionedDataPath)) { + deleteDataFile(nonPartitionedDataPath) + } else { + deleteDataFile(partitionedDataPath, true) + } + + // Validate only index records whose lineage is the deleted file are removed. + val originalIndexDF = spark.read.parquet(s"$systemPath/${indexConfig.indexName}/" + + s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=0") + val originalIndexWithoutDeletedFile = originalIndexDF + .filter(s"""${IndexConstants.DATA_FILE_NAME_COLUMN} != "$deletedFile"""") + + hyperspace.refreshIndex(indexConfig.indexName) + + val refreshedIndexDF = spark.read.parquet(s"$systemPath/${indexConfig.indexName}/" + + s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=1") + + checkAnswer(originalIndexWithoutDeletedFile, refreshedIndexDF) + } + } + } + } + + test( + "Validate refresh index (to handle deletes from the source data) " + + "fails as expected on an index without lineage.") { + SampleData.save( + spark, + nonPartitionedDataPath, + Seq("Date", "RGUID", "Query", "imprs", "clicks")) + val nonPartitionedDataDF = spark.read.parquet(nonPartitionedDataPath) + + withSQLConf( + IndexConstants.INDEX_LINEAGE_ENABLED -> "false", + IndexConstants.REFRESH_DELETE_ENABLED -> "true") { + hyperspace.createIndex(nonPartitionedDataDF, indexConfig) + + deleteDataFile(nonPartitionedDataPath) + + val ex = intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) + assert( + ex.getMessage.contains(s"Index refresh (to handle deleted source data) is " + + "only supported on an index with lineage.")) + } + } + + test( + "Validate refresh index (to handle deletes from the source data) " + + "is aborted if no source data file is deleted.") { + SampleData.save( + spark, + nonPartitionedDataPath, + Seq("Date", "RGUID", "Query", "imprs", "clicks")) + val nonPartitionedDataDF = spark.read.parquet(nonPartitionedDataPath) + + withSQLConf( + IndexConstants.INDEX_LINEAGE_ENABLED -> "true", + IndexConstants.REFRESH_DELETE_ENABLED -> "true") { + hyperspace.createIndex(nonPartitionedDataDF, indexConfig) + + val ex = intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) + assert(ex.getMessage.contains("Refresh aborted as no deleted source data file found.")) + } + } + + test( + "Validate refresh index (to handle deletes from the source data) " + + "fails as expected when all source data files are deleted.") { + Seq(true, false).foreach { deleteDataFolder => + withSQLConf( + IndexConstants.INDEX_LINEAGE_ENABLED -> "true", + IndexConstants.REFRESH_DELETE_ENABLED -> "true") { + SampleData.save( + spark, + nonPartitionedDataPath, + Seq("Date", "RGUID", "Query", "imprs", "clicks")) + val nonPartitionedDataDF = spark.read.parquet(nonPartitionedDataPath) + + hyperspace.createIndex(nonPartitionedDataDF, indexConfig) + + if (deleteDataFolder) { + FileUtils.delete(new Path(nonPartitionedDataPath)) + + val ex = intercept[AnalysisException](hyperspace.refreshIndex(indexConfig.indexName)) + assert(ex.getMessage.contains("Path does not exist")) + + } else { + val dataPath = new Path(nonPartitionedDataPath, "*parquet") + dataPath + .getFileSystem(new Configuration) + .globStatus(dataPath) + .foreach(p => FileUtils.delete(p.getPath)) + + val ex = + intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) + assert(ex.getMessage.contains("Invalid plan for creating an index.")) + } + FileUtils.delete(new Path(nonPartitionedDataPath)) + FileUtils.delete(systemPath) + } + } + } + + test( + "Validate refresh index (to handle deletes from the source data) " + + "fails as expected when file info for an existing source data file changes.") { + SampleData.save( + spark, + nonPartitionedDataPath, + Seq("Date", "RGUID", "Query", "imprs", "clicks")) + val nonPartitionedDataDF = spark.read.parquet(nonPartitionedDataPath) + + withSQLConf( + IndexConstants.INDEX_LINEAGE_ENABLED -> "true", + IndexConstants.REFRESH_DELETE_ENABLED -> "true") { + hyperspace.createIndex(nonPartitionedDataDF, indexConfig) + + // Replace a source data file with a new file with same name but different properties. + val deletedFile = deleteDataFile(nonPartitionedDataPath) + FileUtils.createFile( + deletedFile.getFileSystem(new Configuration), + deletedFile, + "I am some random content :).") + + val ex = intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) + assert( + ex.getMessage.contains("Index refresh (to handle deleted source data) aborted. " + + "Existing source data file info is changed")) + } + } + + /** + * Delete one file from a given path. + * + * @param path Path to the parent folder containing data files. + * @param isPartitioned Is data folder partitioned or not. + * @return Path to the deleted file. + */ + private def deleteDataFile(path: String, isPartitioned: Boolean = false): Path = { + val dataPath = if (isPartitioned) { + new Path(s"$path/*/*", "*parquet") + } else { + new Path(path, "*parquet") + } + + val dataFileNames = dataPath + .getFileSystem(new Configuration) + .globStatus(dataPath) + .map(_.getPath) + + assert(dataFileNames.nonEmpty) + val fileToDelete = dataFileNames.head + FileUtils.delete(fileToDelete) + + fileToDelete + } +}